You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by av...@apache.org on 2020/10/30 20:14:31 UTC

[ozone] branch HDDS-3698-upgrade updated: HDDS-4296. SCM changes to process Layout Info in heartbeat request/response (#1486)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3698-upgrade by this push:
     new 8e29229  HDDS-4296. SCM changes to process Layout Info in heartbeat request/response (#1486)
8e29229 is described below

commit 8e29229dc66a0ce5fd63629b08f120c22827cf86
Author: prashantpogde <pr...@gmail.com>
AuthorDate: Fri Oct 30 13:11:37 2020 -0700

    HDDS-4296. SCM changes to process Layout Info in heartbeat request/response (#1486)
---
 .../upgrade/AbstractLayoutVersionManager.java      | 17 +++++
 .../FinalizeNewLayoutVersionCommandHandler.java    | 15 +----
 .../commands/FinalizeNewLayoutVersionCommand.java  |  7 +++
 .../interface-client/src/main/proto/hdds.proto     |  2 +
 .../apache/hadoop/hdds/scm/node/NodeManager.java   | 11 ++++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 64 ++++++++++++++++++-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java | 18 ++++++
 .../hadoop/hdds/scm/container/MockNodeManager.java | 12 ++++
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   | 72 +++++++++++++++++++++-
 .../testutils/ReplicationNodeManagerMock.java      | 11 ++++
 .../ozone/om/upgrade/OMUpgradeFinalizer.java       |  1 +
 11 files changed, 214 insertions(+), 16 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
index 6da1ec3..456cf7f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.ozone.upgrade;
 
+import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.ALREADY_FINALIZED;
+import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_DONE;
+import static org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status.FINALIZATION_REQUIRED;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -39,6 +43,8 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
   protected TreeMap<Integer, T> features = new TreeMap<>();
   protected Map<String, T> featureMap = new HashMap<>();
   protected volatile boolean isInitialized = false;
+  protected volatile UpgradeFinalizer.Status currentUpgradeState =
+      FINALIZATION_REQUIRED;
 
   protected void init(int version, T[] lfs) throws IOException {
 
@@ -52,10 +58,16 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
             String.format("Cannot initialize VersionManager. Metadata " +
                     "layout version (%d) > software layout version (%d)",
                 metadataLayoutVersion, softwareLayoutVersion));
+      } else if (metadataLayoutVersion == softwareLayoutVersion) {
+        currentUpgradeState = ALREADY_FINALIZED;
       }
     }
   }
 
+  public UpgradeFinalizer.Status getUpgradeState() {
+    return currentUpgradeState;
+  }
+
   private void initializeFeatures(T[] lfs) {
     Arrays.stream(lfs).forEach(f -> {
       Preconditions.checkArgument(!featureMap.containsKey(f.name()));
@@ -71,6 +83,7 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
     featureMap.clear();
     features.clear();
     isInitialized = false;
+    currentUpgradeState = ALREADY_FINALIZED;
   }
 
   public void finalized(T layoutFeature) {
@@ -92,6 +105,10 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
     }
   }
 
+  public void completeFinalization() {
+    currentUpgradeState = FINALIZATION_DONE;
+  }
+
   private boolean softwareIsBehindMetaData() {
     return metadataLayoutVersion > softwareLayoutVersion;
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
index dc0fdfe..34f7dc5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
@@ -16,18 +16,12 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos
-    .FinalizeNewLayoutVersionCommandProto;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
@@ -63,16 +57,9 @@ public class FinalizeNewLayoutVersionCommandHandler implements CommandHandler {
   @Override
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
-    LOG.debug("Processing FinalizeNewLayoutVersionCommandHandler command.");
+    LOG.info("Processing FinalizeNewLayoutVersionCommandHandler command.");
     invocationCount.incrementAndGet();
     final long startTime = Time.monotonicNow();
-    final DatanodeDetails datanodeDetails = context.getParent()
-        .getDatanodeDetails();
-    final FinalizeNewLayoutVersionCommandProto finalizeCommand =
-        ((FinalizeNewLayoutVersionCommand)command).getProto();
-    final ContainerController controller = ozoneContainer.getController();
-    final boolean finalizeUpgrade =
-        finalizeCommand.getFinalizeNewLayoutVersion();
     try {
       // TODO : finalization logic
       if (LOG.isDebugEnabled()) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeNewLayoutVersionCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeNewLayoutVersionCommand.java
index e373f6e..74af7da 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeNewLayoutVersionCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/FinalizeNewLayoutVersionCommand.java
@@ -44,6 +44,13 @@ public class FinalizeNewLayoutVersionCommand
     this.layoutInfo = layoutInfo;
   }
 
+  public FinalizeNewLayoutVersionCommand(boolean finalizeNewLayoutVersion,
+                                         LayoutVersionProto layoutInfo) {
+    super();
+    finalizeUpgrade = finalizeNewLayoutVersion;
+    this.layoutInfo = layoutInfo;
+  }
+
   /**
    * Returns the type of this command.
    *
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index b43a74c..f56cf2e 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -131,6 +131,8 @@ enum NodeState {
     DEAD = 3;
     DECOMMISSIONING = 4;
     DECOMMISSIONED = 5;
+    HEALTHY_READ_ONLY = 6;
+
 }
 
 enum QueryScope {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 7853181..3952565 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.node;
 
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -185,6 +186,16 @@ public interface NodeManager extends StorageContainerNodeProtocol,
                          NodeReportProto nodeReport);
 
   /**
+   * Process Node LayoutVersion report.
+   *
+   * @param datanodeDetails
+   * @param layoutReport
+   */
+  void processLayoutVersionReport(DatanodeDetails datanodeDetails,
+                         LayoutVersionProto layoutReport);
+
+
+  /**
    * Get list of SCMCommands in the Command Queue for a particular Datanode.
    * @param dnID - Datanode uuid.
    * @return list of commands
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index e602592..793ba40 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdds.scm.VersionInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -65,6 +66,7 @@ import org.apache.hadoop.net.TableMapping;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -92,7 +94,7 @@ import org.slf4j.LoggerFactory;
  */
 public class SCMNodeManager implements NodeManager {
 
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(SCMNodeManager.class);
 
   private final NodeStateManager nodeStateManager;
@@ -110,6 +112,7 @@ public class SCMNodeManager implements NodeManager {
   private final int numPipelinesPerMetadataVolume;
   private final int heavyNodeCriteria;
   private final HDDSLayoutVersionManager scmLayoutVersionManager;
+  private final EventPublisher scmNodeEventPublisher;
 
   /**
    * Constructs SCM machine Manager.
@@ -119,6 +122,7 @@ public class SCMNodeManager implements NodeManager {
                         EventPublisher eventPublisher,
                         NetworkTopology networkTopology,
                         HDDSLayoutVersionManager layoutVersionManager) {
+    this.scmNodeEventPublisher = eventPublisher;
     this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
     this.version = VersionInfo.getLatestVersion();
     this.commandQueue = new CommandQueue();
@@ -401,6 +405,64 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
+   * Process Layout Version report.
+   *
+   * @param datanodeDetails
+   * @param layoutVersionReport
+   */
+  @Override
+  public void processLayoutVersionReport(DatanodeDetails datanodeDetails,
+                                LayoutVersionProto layoutVersionReport) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing Layout Version report from [datanode={}]",
+          datanodeDetails.getHostName());
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("HB is received from [datanode={}]: <json>{}</json>",
+          datanodeDetails.getHostName(),
+          layoutVersionReport.toString().replaceAll("\n", "\\\\n"));
+    }
+
+    if (layoutVersionReport != null) {
+      int scmSlv = scmLayoutVersionManager.getSoftwareLayoutVersion();
+      int scmMlv = scmLayoutVersionManager.getMetadataLayoutVersion();
+      int dnSlv = layoutVersionReport.getSoftwareLayoutVersion();
+      int dnMlv = layoutVersionReport.getMetadataLayoutVersion();
+
+      // If the data node slv is > scm slv => log error condition
+      if (dnSlv > scmSlv) {
+        LOG.error("Rogue data node in the cluster : {}. " +
+                "DataNode SoftwareLayoutVersion = {}, SCM " +
+                "SoftwareLayoutVersion = {}",
+            datanodeDetails.getHostName(), dnSlv, scmSlv);
+      }
+
+      // If the datanode slv < scm slv, it can not be allowed to be part of
+      // any pipeline. However it can be allowed to join the cluster
+      if (dnMlv < scmMlv) {
+        LOG.warn("Data node {} can not be used in any pipeline in the " +
+                "cluster. " + "DataNode MetadataLayoutVersion = {}, SCM " +
+                "MetadataLayoutVersion = {}",
+            datanodeDetails.getHostName(), dnMlv, scmMlv);
+
+        // TBD: Add NEED_UPGRADE state and fill out state transitions
+        // around this state. Fire event to move this data node to
+        // NEED_UPGRADE state. The DataNode will be considered HEALTHY in
+        // this state but it can not be made part of any Pipeline.
+
+        // Also send Finalize command to the data node. Its OK to
+        // send Finalize command multiple times.
+        scmNodeEventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+            new CommandForDatanode<>(datanodeDetails.getUuid(),
+                new FinalizeNewLayoutVersionCommand(true,
+                    LayoutVersionProto.newBuilder()
+                        .setSoftwareLayoutVersion(dnSlv)
+                        .setMetadataLayoutVersion(dnSlv).build())));
+      }
+    }
+  }
+
+  /**
    * Returns the aggregated node stats.
    *
    * @return the aggregated node stats.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 3dbb4cb..8447b19 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -101,6 +102,11 @@ public final class SCMDatanodeHeartbeatDispatcher {
       commands = nodeManager.getCommandQueue(dnID);
 
     } else {
+      if (heartbeat.hasDataNodeLayoutVersion()) {
+        LOG.debug("Processing DataNode Layout Report.");
+        nodeManager.processLayoutVersionReport(datanodeDetails,
+            heartbeat.getDataNodeLayoutVersion());
+      }
 
       // should we dispatch heartbeat through eventPublisher?
       commands = nodeManager.processHeartbeat(datanodeDetails);
@@ -215,6 +221,18 @@ public final class SCMDatanodeHeartbeatDispatcher {
   }
 
   /**
+   * Layout report event payload with origin.
+   */
+  public static class LayoutReportFromDatanode
+      extends ReportFromDatanode<LayoutVersionProto> {
+
+    public LayoutReportFromDatanode(DatanodeDetails datanodeDetails,
+                                  LayoutVersionProto report) {
+      super(datanodeDetails, report);
+    }
+  }
+
+  /**
    * Container report event payload with origin.
    */
   public static class ContainerReportFromDatanode
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 42369f9..a3c09d8 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -360,6 +360,18 @@ public class MockNodeManager implements NodeManager {
   }
 
   /**
+   * Empty implementation for processLayoutVersionReport.
+   *
+   * @param dnUuid
+   * @param layoutReport
+   */
+  @Override
+  public void processLayoutVersionReport(DatanodeDetails dnUuid,
+                                         LayoutVersionProto layoutReport) {
+    // do nothing
+  }
+
+  /**
    * Update set of containers available on a datanode.
    * @param uuid - DatanodeID
    * @param containerIds - Set of containerIDs
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 3222a9a..9be6cc9 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
@@ -46,9 +47,11 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -67,6 +70,7 @@ import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanode
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.errorNodeNotPermitted;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto.ErrorCode.success;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
@@ -74,17 +78,23 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCE
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 import static org.apache.hadoop.hdds.scm.TestUtils.getRandomPipelineReports;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.*;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
 import org.junit.After;
 import org.junit.Assert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 /**
@@ -479,6 +489,66 @@ public class TestSCMNodeManager {
     }
   }
 
+  @Test
+  public void testProcessLayoutVersionReportHigherMlv() throws IOException,
+      AuthenticationException {
+    final int healthCheckInterval = 200; // milliseconds
+    final int heartbeatInterval = 1; // seconds
+
+    OzoneConfiguration conf = getConf();
+    conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL,
+        healthCheckInterval, MILLISECONDS);
+    conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
+        heartbeatInterval, SECONDS);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      DatanodeDetails node1 =
+          TestUtils.createRandomDatanodeAndRegister(nodeManager);
+      GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+          .captureLogs(SCMNodeManager.LOG);
+      int scmMlv =
+          nodeManager.getLayoutVersionManager().getMetadataLayoutVersion();
+      nodeManager.processLayoutVersionReport(node1,
+          LayoutVersionProto.newBuilder()
+              .setMetadataLayoutVersion(scmMlv + 1)
+              .setSoftwareLayoutVersion(scmMlv + 2)
+              .build());
+      Assert.assertTrue(logCapturer.getOutput()
+          .contains("Rogue data node in the cluster"));
+    }
+  }
+
+  @Test
+  public void testProcessLayoutVersionLowerMlv() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    SCMStorageConfig scmStorageConfig = mock(SCMStorageConfig.class);
+    when(scmStorageConfig.getClusterID()).thenReturn("xyz111");
+    EventPublisher eventPublisher = mock(EventPublisher.class);
+    HDDSLayoutVersionManager lvm  =
+        HDDSLayoutVersionManager.initialize(scmStorageConfig);
+    SCMNodeManager nodeManager  = new SCMNodeManager(conf,
+        scmStorageConfig, eventPublisher, new NetworkTopologyImpl(conf), lvm);
+    DatanodeDetails node1 =
+        TestUtils.createRandomDatanodeAndRegister(nodeManager);
+    verify(eventPublisher,
+        times(1)).fireEvent(NEW_NODE, node1);
+    int scmMlv =
+        nodeManager.getLayoutVersionManager().getMetadataLayoutVersion();
+    nodeManager.processLayoutVersionReport(node1,
+        LayoutVersionProto.newBuilder()
+            .setMetadataLayoutVersion(scmMlv - 1)
+            .setSoftwareLayoutVersion(scmMlv)
+            .build());
+    ArgumentCaptor<CommandForDatanode> captor =
+        ArgumentCaptor.forClass(CommandForDatanode.class);
+    verify(eventPublisher, times(1))
+        .fireEvent(Mockito.eq(DATANODE_COMMAND), captor.capture());
+    assertTrue(captor.getValue().getDatanodeId()
+        .equals(node1.getUuid()));
+    assertTrue(captor.getValue().getCommand().getType()
+        .equals(finalizeNewLayoutVersionCommand));
+  }
+
   /**
    * Check for NPE when datanodeDetails is passed null for sendHeartbeat.
    *
@@ -988,7 +1058,7 @@ public class TestSCMNodeManager {
       DatanodeDetails datanodeDetails =
           TestUtils.createRandomDatanodeAndRegister(nodeManager);
       NodeReportHandler nodeReportHandler = new NodeReportHandler(nodeManager);
-      EventPublisher publisher = Mockito.mock(EventPublisher.class);
+      EventPublisher publisher = mock(EventPublisher.class);
       final long capacity = 2000;
       final long usedPerHeartbeat = 100;
       UUID dnId = datanodeDetails.getUuid();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 6d66a4b..a30de35 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -322,6 +322,17 @@ public class ReplicationNodeManagerMock implements NodeManager {
     // do nothing.
   }
 
+  /**
+   * Empty implementation for processLayoutVersionReport.
+   * @param dnUuid
+   * @param layoutVersionReport
+   */
+  @Override
+  public void processLayoutVersionReport(DatanodeDetails dnUuid,
+                                LayoutVersionProto layoutVersionReport) {
+    // do nothing.
+  }
+
   @Override
   public void onMessage(CommandForDatanode commandForDatanode,
                         EventPublisher publisher) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
index 85e1382..4882fcf 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMUpgradeFinalizer.java
@@ -144,6 +144,7 @@ public class OMUpgradeFinalizer implements UpgradeFinalizer<OzoneManager> {
           versionManager.finalized(f);
         }
 
+        versionManager.completeFinalization();
         emitFinishedMsg();
       } finally {
         isDone = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org