You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by av...@apache.org on 2021/07/02 06:36:01 UTC

[ozone] 01/01: Merge remote-tracking branch 'ssh-upstream/master' into ssh-upstream-upgrade-branch

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-nonrolling-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 039afb37ce6fb4522e6d32de974c0cc96d868592
Merge: 4506883 1d8f972
Author: Aravindan Vijayan <av...@cloudera.com>
AuthorDate: Thu Jul 1 23:35:16 2021 -0700

    Merge remote-tracking branch 'ssh-upstream/master' into ssh-upstream-upgrade-branch

 .github/workflows/post-commit.yml                  |   8 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |   6 +-
 .../hadoop/hdds/fs/AbstractSpaceUsageSource.java   |   7 +-
 .../hadoop/hdds/fs/DedicatedDiskSpaceUsage.java    |   8 +-
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  86 +----
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |  21 +
 .../hadoop/hdds/scm/container/ContainerInfo.java   |   5 +
 .../hadoop/hdds/scm/ha/SCMHAConfiguration.java     |   4 +-
 .../protocol/StorageContainerLocationProtocol.java |  21 +
 .../hadoop/hdds/security/x509/crl/CRLStatus.java   |  87 +++++
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |   3 +
 .../common/src/main/resources/ozone-default.xml    | 141 +------
 .../server/ratis/ContainerStateMachine.java        |  74 ++--
 .../common/volume/StorageVolumeChecker.java        |  20 +-
 .../container/stream/DirectoryServerSource.java    |   6 +-
 .../container/stream/DirstreamClientHandler.java   |   5 +
 .../container/stream/DirstreamServerHandler.java   |  43 ++-
 .../ozone/container/stream/StreamingClient.java    |  28 +-
 .../container/stream/StreamingException.java}      |  27 +-
 .../ozone/container/stream/StreamingServer.java    |  68 ++--
 .../stream/TestDirstreamClientHandler.java         | 139 +++++++
 .../container/stream/TestStreamingServer.java      |  48 +++
 .../ozone/container/stream/package-info.java}      |  43 +--
 hadoop-hdds/docs/content/interface/O3fs.md         |   8 +
 hadoop-hdds/docs/content/interface/O3fs.zh.md      |   8 +
 ...inerLocationProtocolClientSideTranslatorPB.java |  73 ++++
 .../SCMBlockLocationFailoverProxyProvider.java     |   8 +-
 .../SCMSecurityProtocolFailoverProxyProvider.java  |   8 +-
 .../scm/update/client/CRLClientUpdateHandler.java  |   2 +-
 .../certificate/authority/CertificateStore.java    |   6 +
 .../authority/PKIProfiles/DefaultProfile.java      |   2 +-
 .../hadoop/hdds/security/x509/crl/CRLCodec.java    |   0
 .../hadoop/hdds/security/x509/crl/CRLInfo.java     |   0
 .../hdds/security/x509/crl/CRLInfoCodec.java       |   0
 .../hadoop/hdds/server/events/EventExecutor.java   |   5 +
 .../hadoop/hdds/server/events/EventQueue.java      |  44 ++-
 ...dExecutor.java => FixedThreadPoolExecutor.java} |  42 +-
 .../hdds/server/events/SingleThreadExecutor.java   |   9 +
 .../hadoop/hdds/server/http/ProfileServlet.java    |   2 +-
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |   9 +
 .../x509/certificate/authority/MockCAStore.java    |  12 +
 .../hadoop/hdds/server/events/TestEventQueue.java  |  62 ++-
 .../src/main/proto/ScmAdminProtocol.proto          |  50 ++-
 hadoop-hdds/interface-client/pom.xml               |  13 -
 hadoop-hdds/interface-server/pom.xml               |   2 +
 .../src/main/proto/SCMUpdateProtocol.proto         |   0
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  |  31 +-
 .../block/DatanodeDeletedBlockTransactions.java    |   6 -
 .../hdds/scm/block/DeletedBlockLogImplV2.java      |   1 +
 .../scm/block/DeletedBlockLogStateManagerImpl.java |  26 +-
 .../hdds/scm/block/PendingDeleteStatusList.java    |  85 ----
 .../hdds/scm/block/SCMBlockDeletingService.java    |  24 --
 .../hdds/scm/container/ContainerReportHandler.java |  38 --
 .../scm/container/ContainerStateManagerImpl.java   |   2 +-
 .../hdds/scm/container/ReplicationManager.java     |  11 +-
 .../scm/container/balancer/ContainerBalancer.java  | 101 +++--
 .../balancer/ContainerBalancerConfiguration.java   |  28 ++
 .../hdds/scm/crl/CRLStatusReportHandler.java       |  87 +++++
 .../apache/hadoop/hdds/scm/crl/package-info.java}  |  43 +--
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |  19 +-
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |   8 +-
 .../apache/hadoop/hdds/scm/node/DatanodeInfo.java  |  14 +
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  15 +-
 .../scm/pipeline/PipelineStateManagerV2Impl.java   |   4 +-
 ...inerLocationProtocolServerSideTranslatorPB.java |  70 ++++
 .../scm/server/OzoneStorageContainerManager.java   |   3 +
 .../hdds/scm/server/SCMBlockProtocolServer.java    |   2 +
 .../hadoop/hdds/scm/server/SCMCertStore.java       |  20 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  65 ++++
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |  15 +-
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |   3 +
 .../hdds/scm/server/StorageContainerManager.java   |  90 +++--
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java | 113 ++++--
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |  96 ++++-
 .../hadoop/hdds/scm/container/MockNodeManager.java |   8 +
 .../container/balancer/TestContainerBalancer.java  |  20 +-
 .../algorithms/TestContainerPlacementFactory.java  |  11 +
 .../TestSCMContainerPlacementCapacity.java         |  16 +-
 .../TestSCMContainerPlacementRackAware.java        |  22 +-
 .../TestSCMContainerPlacementRandom.java           |  19 +-
 .../hdds/scm/crl/TestCRLStatusReportHandler.java   | 137 +++++++
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  49 ++-
 .../hdds/scm/node/TestNodeReportHandler.java       |  21 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  25 +-
 .../hdds/scm/node/TestSCMNodeStorageStatMap.java   |  11 +-
 .../hadoop/hdds/scm/node/TestStatisticsUpdate.java |  14 +-
 .../scm/pipeline/TestPipelinePlacementPolicy.java  |  33 +-
 .../hdds/scm/update/server/MockCRLStore.java       |   6 +-
 .../ozone/container/common/TestEndPoint.java       |  25 +-
 .../hdds/scm/cli/ContainerBalancerCommands.java    | 108 ++++++
 .../scm/cli/ContainerBalancerStartSubcommand.java  |  66 ++++
 .../cli/ContainerBalancerStatusSubcommand.java}    |  38 +-
 .../scm/cli/ContainerBalancerStopSubcommand.java}  |  35 +-
 .../hdds/scm/cli/ContainerOperationClient.java     |  20 +
 .../datanode/TestContainerBalancerSubCommand.java  | 141 +++++++
 .../hadoop/ozone/client/MockOmTransport.java       |  12 +
 .../hadoop/ozone/client/TestOzoneClient.java       |  34 +-
 .../src/main/compose/compatibility/docker-config   |   1 +
 .../dist/src/main/compose/ozone-csi/docker-config  |   1 +
 .../dist/src/main/compose/ozone-ha/docker-config   |   1 +
 .../dist/src/main/compose/ozone-mr/common-config   |   1 +
 .../dist/src/main/compose/ozone-mr/test.sh         |   6 +-
 .../src/main/compose/ozone-om-ha/docker-config     |   1 +
 .../src/main/compose/ozone-topology/docker-config  |   1 +
 .../dist/src/main/compose/ozone/docker-config      |   1 +
 .../src/main/compose/ozones3-haproxy/docker-config |   1 +
 .../src/main/compose/ozonesecure-ha/docker-config  |   1 +
 .../src/main/compose/ozonesecure-mr/docker-config  |   2 +
 .../dist/src/main/compose/restart/docker-config    |   1 +
 hadoop-ozone/dist/src/main/compose/test-all.sh     |   8 +-
 .../compose/upgrade/compose/non-ha/docker-config   |   3 +-
 hadoop-ozone/dist/src/main/compose/upgrade/test.sh |   3 +
 .../dist/src/main/compose/xcompat/docker-config    |   1 +
 .../src/main/smoketest/ozonefs/hadoopo3fs.robot    |   2 +
 .../apache/hadoop/ozone/MiniOzoneChaosCluster.java |   3 +
 .../ozone/TestContainerBalancerOperations.java     | 112 ++++++
 .../commandhandler/TestBlockDeletion.java          |  47 ---
 .../commandhandler/TestCloseContainerHandler.java  |   4 +
 .../commandhandler/TestDeleteContainerHandler.java |   4 +
 .../TestDatanodeHddsVolumeFailureDetection.java    |   4 +
 .../TestDatanodeHddsVolumeFailureToleration.java   |   4 +
 .../hadoop/ozone/fsck/TestContainerMapper.java     |   5 +
 .../src/main/proto/OmClientProtocol.proto          |   1 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |   2 +-
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |   1 -
 .../apache/hadoop/ozone/om/TrashPolicyOzone.java   |  20 +-
 .../request/bucket/OMBucketSetPropertyRequest.java |  19 +-
 .../bucket/TestOMBucketSetPropertyRequest.java     |  10 +
 hadoop-ozone/ozonefs-shaded/pom.xml                |   3 +
 .../hadoop/ozone/recon/api/NodeEndpoint.java       |  17 +-
 .../ozone/recon/api/types/DatanodeMetadata.java    |  15 +
 .../ozone/recon/scm/ReconContainerManager.java     |  18 +
 .../scm/ReconStorageContainerManagerFacade.java    |   6 +
 .../webapps/recon/ozone-recon-web/api/db.json      |  20 +
 .../src/views/datanodes/datanodes.tsx              |  11 +
 .../hadoop/ozone/recon/api/TestEndpoints.java      |   3 +-
 .../ozone/recon/api/TestOpenContainerCount.java    | 427 +++++++++++++++++++++
 .../ozone/audit/parser/common/DatabaseHelper.java  |   7 +-
 .../hadoop/ozone/freon/StreamingGenerator.java     |  23 +-
 139 files changed, 3046 insertions(+), 929 deletions(-)

diff --cc hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 60c3931,31c4472..1701e4c
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@@ -36,10 -36,9 +36,11 @@@ import java.util.EnumSet
  import java.util.Collections;
  import java.util.List;
  import java.util.Map;
+ import java.util.Optional;
  import java.util.Set;
  
 +import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
 +
  /**
   * ContainerLocationProtocol is used by an HDFS node to find the set of nodes
   * that currently host a container.
diff --cc hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 6313579,d67dce9..3728a49
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@@ -69,9 -69,9 +69,11 @@@ message ScmContainerLocationRequest 
    optional DatanodeUsageInfoRequestProto DatanodeUsageInfoRequest = 30;
    optional GetExistContainerWithPipelinesInBatchRequestProto getExistContainerWithPipelinesInBatchRequest = 31;
    optional GetContainerTokenRequestProto containerTokenRequest = 32;
-   optional FinalizeScmUpgradeRequestProto finalizeScmUpgradeRequest = 33;
-   optional QueryUpgradeFinalizationProgressRequestProto
-   queryUpgradeFinalizationProgressRequest = 34;
+   optional StartContainerBalancerRequestProto startContainerBalancerRequest = 33;
+   optional StopContainerBalancerRequestProto stopContainerBalancerRequest = 34;
+   optional ContainerBalancerStatusRequestProto containerBalancerStatusRequest = 35;
++  optional FinalizeScmUpgradeRequestProto finalizeScmUpgradeRequest = 36;
++  optional QueryUpgradeFinalizationProgressRequestProto queryUpgradeFinalizationProgressRequest = 37;
  }
  
  message ScmContainerLocationResponse {
@@@ -112,9 -112,9 +114,11 @@@
    optional DatanodeUsageInfoResponseProto DatanodeUsageInfoResponse = 30;
    optional GetExistContainerWithPipelinesInBatchResponseProto getExistContainerWithPipelinesInBatchResponse = 31;
    optional GetContainerTokenResponseProto containerTokenResponse = 32;
-   optional FinalizeScmUpgradeResponseProto finalizeScmUpgradeResponse = 33;
-   optional QueryUpgradeFinalizationProgressResponseProto
-   queryUpgradeFinalizationProgressResponse = 34;
+   optional StartContainerBalancerResponseProto startContainerBalancerResponse = 33;
+   optional StopContainerBalancerResponseProto stopContainerBalancerResponse = 34;
+   optional ContainerBalancerStatusResponseProto containerBalancerStatusResponse = 35;
++  optional FinalizeScmUpgradeResponseProto finalizeScmUpgradeResponse = 36;
++  optional QueryUpgradeFinalizationProgressResponseProto queryUpgradeFinalizationProgressResponse = 37;
  
    enum Status {
      OK = 1;
@@@ -153,8 -153,9 +157,11 @@@ enum Type 
    DatanodeUsageInfo = 25;
    GetExistContainerWithPipelinesInBatch = 26;
    GetContainerToken = 27;
-   FinalizeScmUpgrade = 28;
-   QueryUpgradeFinalizationProgress = 29;
+   StartContainerBalancer = 28;
+   StopContainerBalancer = 29;
+   GetContainerBalancerStatus = 30;
++  FinalizeScmUpgrade = 31;
++  QueryUpgradeFinalizationProgress = 32;
  }
  
  /**
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 6f9030c,a374766..d9872ce
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@@ -671,37 -675,64 +677,96 @@@ public class SCMClientProtocolServer im
    }
  
    @Override
 +  public StatusAndMessages finalizeScmUpgrade(String upgradeClientID) throws
 +      IOException {
 +    // check admin authorization
 +    try {
 +      getScm().checkAdminAccess(getRemoteUser());
 +    } catch (IOException e) {
 +      LOG.error("Authorization failed for finalize scm upgrade", e);
 +      throw e;
 +    }
 +    return scm.finalizeUpgrade(upgradeClientID);
 +  }
 +
 +  @Override
 +  public StatusAndMessages queryUpgradeFinalizationProgress(
 +      String upgradeClientID, boolean force, boolean readonly)
 +      throws IOException {
 +    if (!readonly) {
 +      // check admin authorization
 +      try {
 +        getScm().checkAdminAccess(getRemoteUser());
 +      } catch (IOException e) {
 +        LOG.error("Authorization failed for query scm upgrade finalization " +
 +            "progress", e);
 +        throw e;
 +      }
 +    }
 +
 +    return scm.queryUpgradeFinalizationProgress(upgradeClientID, force,
 +        readonly);
 +  }
 +
++  @Override
+   public boolean startContainerBalancer(Optional<Double> threshold,
+                   Optional<Integer> idleiterations,
+                   Optional<Integer> maxDatanodesToBalance,
+                   Optional<Long> maxSizeToMoveInGB) throws IOException{
+     getScm().checkAdminAccess(getRemoteUser());
+     ContainerBalancerConfiguration cbc = new ContainerBalancerConfiguration();
+     if (threshold.isPresent()) {
+       double tsd = threshold.get();
+       Preconditions.checkState(tsd >= 0.0D && tsd < 1.0D,
+           "threshold should to be specified in range [0.0, 1.0).");
+       cbc.setThreshold(tsd);
+     }
+     if (maxSizeToMoveInGB.isPresent()) {
+       long mstm = maxSizeToMoveInGB.get();
+       Preconditions.checkState(mstm > 0,
+           "maxSizeToMoveInGB must be positive.");
+       cbc.setMaxSizeToMove(mstm * OzoneConsts.GB);
+     }
+     if (maxDatanodesToBalance.isPresent()) {
+       int mdtb = maxDatanodesToBalance.get();
+       Preconditions.checkState(mdtb > 0,
+           "maxDatanodesToBalance must be positive.");
+       cbc.setMaxDatanodesToBalance(mdtb);
+     }
+     if (idleiterations.isPresent()) {
+       int idi = idleiterations.get();
+       Preconditions.checkState(idi > 0 || idi == -1,
+           "idleiterations must be positive or" +
+               " -1(infinitly run container balancer).");
+       cbc.setIdleIteration(idi);
+     }
+ 
+     boolean isStartedSuccessfully = scm.getContainerBalancer().start(cbc);
+     if (isStartedSuccessfully) {
+       AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+           SCMAction.START_CONTAINER_BALANCER, null));
+     } else {
+       AUDIT.logWriteFailure(buildAuditMessageForSuccess(
+           SCMAction.START_CONTAINER_BALANCER, null));
+     }
+     return  isStartedSuccessfully;
+   }
+ 
+   @Override
+   public void stopContainerBalancer() throws IOException {
+     getScm().checkAdminAccess(getRemoteUser());
+     AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+         SCMAction.STOP_CONTAINER_BALANCER, null));
+     scm.getContainerBalancer().stop();
+   }
+ 
+   @Override
+   public boolean getContainerBalancerStatus() {
+     AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+         SCMAction.GET_CONTAINER_BALANCER_STATUS, null));
+     return scm.getContainerBalancer().isBalancerRunning();
+   }
+ 
    /**
     * Get Datanode usage info such as capacity, SCMUsed, and remaining by ip
     * or uuid.
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 3439933,5a69121..745be9d
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@@ -20,8 -20,9 +20,10 @@@ package org.apache.hadoop.hdds.scm.serv
  import com.google.common.base.Preconditions;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
  import org.apache.hadoop.hdds.protocol.proto
+     .StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+ import org.apache.hadoop.hdds.protocol.proto
      .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
  import org.apache.hadoop.hdds.protocol.proto
      .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
  import org.apache.hadoop.hdds.protocol.proto
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 0f8ea75,b75112c..109e5e4
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@@ -375,19 -390,13 +404,17 @@@ public final class StorageContainerMana
          pipelineManager, containerManager);
      StartDatanodeAdminHandler datanodeStartAdminHandler =
          new StartDatanodeAdminHandler(scmNodeManager, pipelineManager);
 -    NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
 -        new NonHealthyToHealthyNodeHandler(configuration, serviceManager);
 +    ReadOnlyHealthyToHealthyNodeHandler readOnlyHealthyToHealthyNodeHandler =
-         new ReadOnlyHealthyToHealthyNodeHandler(conf, serviceManager);
++        new ReadOnlyHealthyToHealthyNodeHandler(configuration, serviceManager);
 +    HealthyReadOnlyNodeHandler
 +        healthyReadOnlyNodeHandler =
 +        new HealthyReadOnlyNodeHandler(scmNodeManager,
-             pipelineManager, conf);
++            pipelineManager, configuration);
      ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
-     PendingDeleteHandler pendingDeleteHandler =
-         new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
  
      ContainerReportHandler containerReportHandler =
-         new ContainerReportHandler(
-             scmNodeManager, containerManager, scmContext, conf);
+         new ContainerReportHandler(scmNodeManager, containerManager,
+             scmContext, configuration);
  
      IncrementalContainerReportHandler incrementalContainerReportHandler =
          new IncrementalContainerReportHandler(
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index af0538a,4d364ef..db145c7
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@@ -226,16 -232,23 +232,32 @@@ public final class TestUtils 
          StorageTypeProto.DISK);
    }
  
+   /**
+    * Generates random metadata storage report.
+    *
+    * @param path path of the storage
+    *
+    * @return MetadataStorageReportProto
+    */
+   public static MetadataStorageReportProto getRandomMetadataStorageReport(
+       String path) {
+     return createMetadataStorageReport(path,
+         random.nextInt(1000),
+         random.nextInt(500),
+         random.nextInt(500),
+         StorageTypeProto.DISK);
+   }
+ 
    public static StorageReportProto createStorageReport(UUID nodeId, String path,
 +      long capacity) {
 +    return createStorageReport(nodeId, path,
 +        capacity,
 +        0,
 +        capacity,
 +        StorageTypeProto.DISK);
 +  }
 +
 +  public static StorageReportProto createStorageReport(UUID nodeId, String path,
         long capacity, long used, long remaining, StorageTypeProto type) {
      return createStorageReport(nodeId, path, capacity, used, remaining,
              type, false);
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
index 33272bd,f1ff0d9..411e59f
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
@@@ -34,9 -35,10 +35,11 @@@ import org.apache.hadoop.hdds.scm.node.
  import org.apache.hadoop.hdds.scm.node.NodeManager;
  
  import org.apache.hadoop.hdds.scm.node.NodeStatus;
 +import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
  import org.junit.Assert;
  import org.junit.Test;
+ 
+ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
  import static org.mockito.Matchers.anyObject;
  import org.mockito.Mockito;
  import static org.mockito.Mockito.when;
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
index 934d986,d91f733..436a1e8
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
@@@ -16,9 -16,9 +16,11 @@@
   */
  package org.apache.hadoop.hdds.scm.node;
  
 +import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
 +
  import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.List;
  import java.util.UUID;
  import org.apache.hadoop.hdds.conf.OzoneConfiguration;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@@ -52,10 -52,11 +55,12 @@@ public class TestNodeReportHandler impl
    private static final Logger LOG = LoggerFactory
        .getLogger(TestNodeReportHandler.class);
    private NodeReportHandler nodeReportHandler;
 +  private HDDSLayoutVersionManager versionManager;
    private SCMNodeManager nodeManager;
    private String storagePath = GenericTestUtils.getRandomizedTempPath()
-       .concat("/" + UUID.randomUUID().toString());
+       .concat("/data-" + UUID.randomUUID().toString());
+   private String metaStoragePath = GenericTestUtils.getRandomizedTempPath()
+       .concat("/metadata-" + UUID.randomUUID().toString());
  
    @Before
    public void resetEventCollector() throws IOException {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 80886cd,17c6a0f..4554b53
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@@ -231,241 -176,6 +231,242 @@@ public class TestSCMNodeManager 
    }
  
    /**
 +   * Tests that node manager handles layout version changes from heartbeats
 +   * correctly.
 +   *
 +   * @throws IOException
 +   * @throws InterruptedException
 +   * @throws TimeoutException
 +   */
 +  @Test
 +  public void testScmLayoutOnHeartbeat() throws Exception {
 +    OzoneConfiguration conf = getConf();
 +    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
 +        1, TimeUnit.DAYS);
 +
 +    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
 +      Assert.assertTrue(scm.checkLeader());
 +      // Register 2 nodes correctly.
 +      // These will be used with a faulty node to test pipeline creation.
 +      DatanodeDetails goodNode1 = registerWithCapacity(nodeManager);
 +      DatanodeDetails goodNode2 = registerWithCapacity(nodeManager);
 +
 +      scm.exitSafeMode();
 +
 +      assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
 +          nodeManager, SMALLER_MLV_LAYOUT_PROTO);
 +      assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
 +          nodeManager, LARGER_MLV_SLV_LAYOUT_PROTO);
 +      assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
 +          nodeManager, SMALLER_MLV_SLV_LAYOUT_PROTO);
 +      assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
 +          nodeManager, LARGER_SLV_LAYOUT_PROTO);
 +    }
 +  }
 +
 +  /**
 +   * Create {@link DatanodeDetails} to register with {@code nodeManager}, and
 +   * provide the datanode maximum capacity so that space used does not block
 +   * pipeline creation.
 +   * @return The created {@link DatanodeDetails}.
 +   */
 +  private DatanodeDetails registerWithCapacity(SCMNodeManager nodeManager) {
 +    return registerWithCapacity(nodeManager,
 +        UpgradeUtils.defaultLayoutVersionProto(), success);
 +  }
 +
 +  /**
 +   * Create {@link DatanodeDetails} to register with {@code nodeManager}, and
 +   * provide the datanode maximum capacity so that space used does not block
 +   * pipeline creation. Also check that the result of registering matched
 +   * {@code expectedResult}.
 +   * @return The created {@link DatanodeDetails}.
 +   */
 +  private DatanodeDetails registerWithCapacity(SCMNodeManager nodeManager,
 +      LayoutVersionProto layout, ErrorCode expectedResult) {
 +    DatanodeDetails details = MockDatanodeDetails.randomDatanodeDetails();
 +    StorageReportProto storageReport =
 +        TestUtils.createStorageReport(details.getUuid(),
 +            details.getNetworkFullPath(), Long.MAX_VALUE);
 +    RegisteredCommand cmd = nodeManager.register(
 +        MockDatanodeDetails.randomDatanodeDetails(),
-         TestUtils.createNodeReport(storageReport),
++        TestUtils.createNodeReport(Arrays.asList(storageReport),
++            Collections.emptyList()),
 +        getRandomPipelineReports(), layout);
 +
 +    Assert.assertEquals(expectedResult, cmd.getError());
 +    return cmd.getDatanode();
 +  }
 +
 +  private void assertPipelineClosedAfterLayoutHeartbeat(
 +      DatanodeDetails originalNode1, DatanodeDetails originalNode2,
 +      SCMNodeManager nodeManager, LayoutVersionProto layout) throws Exception {
 +
 +    List<DatanodeDetails>  originalNodes =
 +        Arrays.asList(originalNode1, originalNode2);
 +
 +    // Initial condition: 2 healthy nodes registered.
 +    assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2,
 +        originalNodes);
 +    assertPipelines(HddsProtos.ReplicationFactor.THREE,
 +        count -> count == 0, new ArrayList<>());
 +
 +    // Even when safemode exit or new node addition trigger pipeline
 +    // creation, they will fail with not enough healthy nodes for ratis 3
 +    // pipeline. Therefore we do not have to worry about this create call
 +    // failing due to datanodes reaching their maximum pipeline limit.
 +    assertPipelineCreationFailsWithNotEnoughNodes(2);
 +
 +    // Register a new node correctly.
 +    DatanodeDetails node = registerWithCapacity(nodeManager);
 +
 +    List<DatanodeDetails> allNodes = new ArrayList<>(originalNodes);
 +    allNodes.add(node);
 +
 +    // Safemode exit and adding the new node should trigger pipeline creation.
 +    assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 3,
 +        allNodes);
 +    assertPipelines(HddsProtos.ReplicationFactor.THREE, count -> count >= 1,
 +        allNodes);
 +
 +    // node sends incorrect layout.
 +    nodeManager.processHeartbeat(node, layout);
 +
 +    // Its pipelines should be closed then removed, meaning there is not
 +    // enough nodes for factor 3 pipelines.
 +    assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2,
 +        originalNodes);
 +    assertPipelines(HddsProtos.ReplicationFactor.THREE,
 +        count -> count == 0, new ArrayList<>());
 +
 +    assertPipelineCreationFailsWithNotEnoughNodes(2);
 +  }
 +
 +  /**
 +   * Tests that node manager handles layout versions for newly registered nodes
 +   * correctly.
 +   *
 +   * @throws IOException
 +   * @throws InterruptedException
 +   * @throws TimeoutException
 +   */
 +  @Test
 +  public void testScmLayoutOnRegister()
 +      throws Exception {
 +
 +    OzoneConfiguration conf = getConf();
 +    conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
 +        1, TimeUnit.DAYS);
 +
 +    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
 +      Assert.assertTrue(scm.checkLeader());
 +      // Nodes with mismatched SLV cannot join the cluster.
 +      registerWithCapacity(nodeManager,
 +          LARGER_SLV_LAYOUT_PROTO, errorNodeNotPermitted);
 +      registerWithCapacity(nodeManager,
 +          SMALLER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted);
 +      registerWithCapacity(nodeManager,
 +          LARGER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted);
 +      // Nodes with mismatched MLV can join, but should not be allowed in
 +      // pipelines.
 +      DatanodeDetails badMlvNode1 = registerWithCapacity(nodeManager,
 +          SMALLER_MLV_LAYOUT_PROTO, success);
 +      DatanodeDetails badMlvNode2 = registerWithCapacity(nodeManager,
 +          SMALLER_MLV_LAYOUT_PROTO, success);
 +      // This node has correct MLV and SLV, so it can join and be used in
 +      // pipelines.
 +      DatanodeDetails goodNode = registerWithCapacity(nodeManager,
 +          CORRECT_LAYOUT_PROTO, success);
 +
 +      Assert.assertEquals(3, nodeManager.getAllNodes().size());
 +
 +      scm.exitSafeMode();
 +
 +      // SCM should auto create a factor 1 pipeline for the one healthy node.
 +      // Still should not have enough healthy nodes for ratis 3 pipeline.
 +      assertPipelines(HddsProtos.ReplicationFactor.ONE,
 +          count -> count == 1,
 +          Collections.singletonList(goodNode));
 +      assertPipelines(HddsProtos.ReplicationFactor.THREE,
 +          count -> count == 0,
 +          new ArrayList<>());
 +
 +      // Even when safemode exit or new node addition trigger pipeline
 +      // creation, they will fail with not enough healthy nodes for ratis 3
 +      // pipeline. Therefore we do not have to worry about this create call
 +      // failing due to datanodes reaching their maximum pipeline limit.
 +      assertPipelineCreationFailsWithNotEnoughNodes(1);
 +
 +      // Heartbeat bad MLV nodes back to healthy.
 +      nodeManager.processHeartbeat(badMlvNode1, CORRECT_LAYOUT_PROTO);
 +      nodeManager.processHeartbeat(badMlvNode2, CORRECT_LAYOUT_PROTO);
 +
 +      // After moving out of healthy readonly, pipeline creation should be
 +      // triggered.
 +      assertPipelines(HddsProtos.ReplicationFactor.ONE,
 +          count -> count == 3,
 +          Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
 +      assertPipelines(HddsProtos.ReplicationFactor.THREE,
 +          count -> count >= 1,
 +          Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
 +    }
 +  }
 +
 +  private void assertPipelineCreationFailsWithNotEnoughNodes(
 +      int actualNodeCount) throws Exception {
 +    try {
 +      ReplicationConfig ratisThree =
 +          ReplicationConfig.fromTypeAndFactor(HddsProtos.ReplicationType.RATIS,
 +              HddsProtos.ReplicationFactor.THREE);
 +      scm.getPipelineManager().createPipeline(ratisThree);
 +      Assert.fail("3 nodes should not have been found for a pipeline.");
 +    } catch (SCMException ex) {
 +      Assert.assertTrue(ex.getMessage().contains("Required 3. Found " +
 +          actualNodeCount));
 +    }
 +  }
 +
 +  private void assertPipelines(HddsProtos.ReplicationFactor factor,
 +      Predicate<Integer> countCheck, Collection<DatanodeDetails> allowedDNs)
 +      throws Exception {
 +
 +    Set<String> allowedDnIds = allowedDNs.stream()
 +        .map(DatanodeDetails::getUuidString)
 +        .collect(Collectors.toSet());
 +
 +    RatisReplicationConfig replConfig = new RatisReplicationConfig(factor);
 +
 +    // Wait for the expected number of pipelines using allowed DNs.
 +    GenericTestUtils.waitFor(() -> {
 +      List<Pipeline> pipelines = scm.getPipelineManager()
 +          .getPipelines(replConfig);
 +      LOG.info("Found {} pipelines of type {} and factor {}.", pipelines.size(),
 +          replConfig.getReplicationType(), replConfig.getReplicationFactor());
 +      boolean success = countCheck.test(pipelines.size());
 +
 +      // If we have the correct number of pipelines, make sure that none of
 +      // these pipelines use nodes outside of allowedDNs.
 +      if (success) {
 +        for (Pipeline pipeline: pipelines) {
 +          for(DatanodeDetails pipelineDN: pipeline.getNodes()) {
 +            // Do not wait for this condition to be true. Disallowed DNs should
 +            // never be used once we have the expected number of pipelines.
 +            if (!allowedDnIds.contains(pipelineDN.getUuidString())) {
 +              String message = String.format("Pipeline %s used datanode %s " +
 +                      "which is not in the set of allowed datanodes: %s",
 +                  pipeline.getId().toString(), pipelineDN.getUuidString(),
 +                  allowedDnIds.toString());
 +              Assert.fail(message);
 +            }
 +          }
 +        }
 +      }
 +
 +      return success;
 +    }, 1000, 10000);
 +  }
 +
 +  /**
     * asserts that if we send no heartbeats node manager stays in safemode.
     *
     * @throws IOException
@@@ -1338,8 -937,9 +1339,9 @@@
          String storagePath = testDir.getAbsolutePath() + "/" + dnId;
          StorageReportProto report = TestUtils
              .createStorageReport(dnId, storagePath, capacity, used, free, null);
-         nodeManager.register(dn, TestUtils.createNodeReport(report), null);
+         nodeManager.register(dn, TestUtils.createNodeReport(
+             Arrays.asList(report), Collections.emptyList()), null);
 -        nodeManager.processHeartbeat(dn);
 +        nodeManager.processHeartbeat(dn, layoutInfo);
        }
        //TODO: wait for EventQueue to be processed
        eventQueue.processAll(8000L);
@@@ -1386,17 -986,13 +1388,18 @@@
        for (int x = 0; x < volumeCount; x++) {
          String storagePath = testDir.getAbsolutePath() + "/" + dnId;
          reports.add(TestUtils
 -                .createStorageReport(dnId, storagePath, capacity,
 -                        used, free, null, failed));
 +            .createStorageReport(dnId, storagePath, capacity,
 +                used, free, null, failed));
          failed = !failed;
        }
-       nodeManager.register(dn, TestUtils.createNodeReport(reports), null);
+       nodeManager.register(dn, TestUtils.createNodeReport(reports,
+           Collections.emptyList()), null);
 -      nodeManager.processHeartbeat(dn);
 +      LayoutVersionManager versionManager =
 +          nodeManager.getLayoutVersionManager();
 +      LayoutVersionProto layoutInfo = toLayoutVersionProto(
 +          versionManager.getMetadataLayoutVersion(),
 +          versionManager.getSoftwareLayoutVersion());
 +      nodeManager.processHeartbeat(dn, layoutInfo);
        //TODO: wait for EventQueue to be processed
        eventQueue.processAll(8000L);
  
@@@ -1446,16 -1042,12 +1449,17 @@@
          StorageReportProto report = TestUtils
              .createStorageReport(dnId, storagePath, capacity, scmUsed,
                  remaining, null);
-         NodeReportProto nodeReportProto = TestUtils.createNodeReport(report);
+         NodeReportProto nodeReportProto = TestUtils.createNodeReport(
+             Arrays.asList(report), Collections.emptyList());
          nodeReportHandler.onMessage(
 -                new NodeReportFromDatanode(datanodeDetails, nodeReportProto),
 -                publisher);
 -        nodeManager.processHeartbeat(datanodeDetails);
 +            new NodeReportFromDatanode(datanodeDetails, nodeReportProto),
 +            publisher);
 +        LayoutVersionManager versionManager =
 +            nodeManager.getLayoutVersionManager();
 +        LayoutVersionProto layoutInfo = toLayoutVersionProto(
 +            versionManager.getMetadataLayoutVersion(),
 +            versionManager.getSoftwareLayoutVersion());
 +        nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
          Thread.sleep(100);
        }
  
@@@ -1759,15 -1342,11 +1765,20 @@@
            .createStorageReport(dnId, storagePath, capacity, used,
                remaining, null);
  
+       nodeManager.register(datanodeDetails, TestUtils.createNodeReport(
+           Arrays.asList(report), Collections.emptyList()),
+           TestUtils.getRandomPipelineReports());
  
 -      nodeManager.processHeartbeat(datanodeDetails);
 +      LayoutVersionManager versionManager =
 +          nodeManager.getLayoutVersionManager();
 +      LayoutVersionProto layoutInfo = toLayoutVersionProto(
 +          versionManager.getMetadataLayoutVersion(),
 +          versionManager.getSoftwareLayoutVersion());
-       nodeManager.register(datanodeDetails, TestUtils.createNodeReport(report),
++      nodeManager.register(datanodeDetails,
++          TestUtils.createNodeReport(Arrays.asList(report),
++              Collections.emptyList()),
 +          TestUtils.getRandomPipelineReports(), layoutInfo);
 +      nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
        if (i == 5) {
          nodeManager.setNodeOperationalState(datanodeDetails,
              HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE);
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 1dcefa4,4a91502..85643d8
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@@ -275,10 -274,12 +277,13 @@@ public class TestEndPoint 
        SCMRegisteredResponseProto responseProto = rpcEndPoint.getEndPoint()
            .register(nodeToRegister.getExtendedProtoBufMessage(), TestUtils
                    .createNodeReport(
-                       getStorageReports(nodeToRegister.getUuid())),
+                       Arrays.asList(getStorageReports(
+                           nodeToRegister.getUuid())),
+                       Arrays.asList(getMetadataStorageReports(
+                           nodeToRegister.getUuid()))),
                TestUtils.getRandomContainerReports(10),
 -              TestUtils.getRandomPipelineReports());
 +              TestUtils.getRandomPipelineReports(),
 +              defaultLayoutVersionProto());
        Assert.assertNotNull(responseProto);
        Assert.assertEquals(nodeToRegister.getUuidString(),
            responseProto.getDatanodeUUID());
@@@ -294,7 -295,14 +299,13 @@@
      return TestUtils.createStorageReport(id, storagePath, 100, 10, 90, null);
    }
  
+   private MetadataStorageReportProto getMetadataStorageReports(UUID id) {
+     String storagePath = testDir.getAbsolutePath() + "/metadata-" + id;
+     return TestUtils.createMetadataStorageReport(storagePath, 100, 10, 90,
+         null);
+   }
+ 
 -  private EndpointStateMachine registerTaskHelper(
 -      InetSocketAddress scmAddress,
 +  private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
        int rpcTimeout, boolean clearDatanodeDetails
    ) throws Exception {
      OzoneConfiguration conf = SCMTestUtils.getConf();
diff --cc hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config
index a6d4482,69e7b91..1a7419c
--- a/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config
@@@ -14,13 -14,12 +14,14 @@@
  # See the License for the specific language governing permissions and
  # limitations under the License.
  
 -CORE-SITE.XML_fs.ofs.impl=org.apache.hadoop.fs.ozone.RootedOzoneFileSystem
 -CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
 +OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
 +
 +OZONE-SITE.XML_ozone.client.failover.max.attempts=6
 +
  OZONE-SITE.XML_ozone.om.address=om
  OZONE-SITE.XML_ozone.om.http-address=om:9874
- 
+ OZONE-SITE.XML_ozone.scm.container.size=1GB
+ OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
  OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
  OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
  OZONE-SITE.XML_ozone.scm.names=scm
diff --cc hadoop-ozone/dist/src/main/compose/upgrade/test.sh
index 2857c1a,0aa1419..a323310
--- a/hadoop-ozone/dist/src/main/compose/upgrade/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/upgrade/test.sh
@@@ -15,27 -15,18 +15,30 @@@
  # See the License for the specific language governing permissions and
  # limitations under the License.
  
 -SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )
 -ALL_RESULT_DIR="$SCRIPT_DIR/result"
 -mkdir -p "$ALL_RESULT_DIR"
 -rm "$ALL_RESULT_DIR"/* || true
 -source "$SCRIPT_DIR/../testlib.sh"
 +# Version that will be run using the local build.
 +: "${OZONE_CURRENT_VERSION:=1.1.0}"
 +export OZONE_CURRENT_VERSION
  
 -tests=$(find_tests)
 -cd "$SCRIPT_DIR"
 +TEST_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )
 +source "$TEST_DIR/testlib.sh"
 +
 +# Export variables needed by tests and ../testlib.sh.
 +export TEST_DIR
 +export COMPOSE_DIR="$TEST_DIR"
  
+ RESULT=0
+ run_test_scripts ${tests} || RESULT=$?
+ 
 -generate_report "upgrade" "${ALL_RESULT_DIR}"
 +RESULT_DIR="$ALL_RESULT_DIR" create_results_dir
 +
 +# Upgrade tests to be run.
 +# Run all upgrades even if one fails.
 +# Any failure will save a failing return code to $RESULT.
 +set +e
 +run_test manual-upgrade 0.5.0 1.1.0
 +run_test non-rolling-upgrade 1.0.0 1.1.0
 +set -e
 +
 +generate_report "upgrade" "$ALL_RESULT_DIR"
  
 -exit ${RESULT}
 +exit "$RESULT"
diff --cc hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenContainerCount.java
index 0000000,2647c9a..0e66553
mode 000000,100644..100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenContainerCount.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenContainerCount.java
@@@ -1,0 -1,424 +1,427 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  * <p>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.ozone.recon.api;
+ 
+ import org.apache.hadoop.ozone.OzoneConsts;
+ import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos
+         .ExtendedDatanodeDetailsProto;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+ import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+ import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+ import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+ import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory;
+ import org.apache.hadoop.ozone.recon.ReconTestInjector;
+ import org.apache.hadoop.ozone.recon.ReconUtils;
+ import org.apache.hadoop.ozone.recon.api.types.DatanodeMetadata;
+ import org.apache.hadoop.ozone.recon.api.types.DatanodesResponse;
+ import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+ import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+ import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+ import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+ import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
+ import org.apache.ozone.test.LambdaTestUtils;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Rule;
+ import org.junit.Test;
+ import org.junit.rules.TemporaryFolder;
+ 
+ import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
++import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.defaultLayoutVersionProto;
+ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
+ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
+ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
+ import static org.mockito.ArgumentMatchers.any;
+ import static org.mockito.ArgumentMatchers.anyBoolean;
+ import static org.mockito.ArgumentMatchers.anyString;
+ import static org.mockito.Mockito.mock;
+ import static org.mockito.Mockito.when;
+ 
+ import javax.servlet.http.HttpServletResponse;
+ import javax.ws.rs.core.Response;
+ 
+ import java.io.IOException;
+ import java.net.HttpURLConnection;
+ import java.util.*;
+ import java.util.concurrent.Callable;
+ 
+ /**
+  * Test for Open Container count per Datanode.
+  */
+ public class TestOpenContainerCount {
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ 
+   private NodeEndpoint nodeEndpoint;
+   private ReconOMMetadataManager reconOMMetadataManager;
+   private ReconStorageContainerManagerFacade reconScm;
+   private boolean isSetupDone = false;
+   private String pipelineId;
+   private String pipelineId2;
+   private DatanodeDetails datanodeDetails;
+   private String datanodeId;
+   private ContainerReportsProto containerReportsProto;
+   private Builder builder;
+   private ExtendedDatanodeDetailsProto extendedDatanodeDetailsProto;
+   private NodeReportProto nodeReportProto;
+   private PipelineReportsProto pipelineReportsProto;
+   private Pipeline pipeline;
+   private Pipeline pipeline2;
+   private static final String HOST1 = "host1.datanode";
+   private static final String IP1 = "1.1.1.1";
+   private ReconUtils reconUtilsMock;
+   private StorageContainerServiceProvider mockScmServiceProvider;
+ 
+   private List<Long> containerIDs;
+ 
+   private List<ContainerWithPipeline> cpw;
+ 
+   private void initializeInjector() throws Exception {
+     reconOMMetadataManager = getTestReconOmMetadataManager(
+             initializeNewOmMetadataManager(temporaryFolder.newFolder()),
+             temporaryFolder.newFolder());
+     datanodeDetails = randomDatanodeDetails();
+     datanodeDetails.setHostName(HOST1);
+     datanodeDetails.setIpAddress(IP1);
+     pipeline = getRandomPipeline(datanodeDetails);
+     pipelineId = pipeline.getId().getId().toString();
+ 
+     pipeline2 = getRandomPipeline(datanodeDetails);
+     pipelineId2 = pipeline2.getId().getId().toString();
+ 
+     StorageContainerLocationProtocol mockScmClient = mock(
+             StorageContainerLocationProtocol.class);
+     mockScmServiceProvider = mock(
+             StorageContainerServiceProviderImpl.class);
+ 
+     when(mockScmServiceProvider.getPipeline(
+             pipeline.getId().getProtobuf())).thenReturn(pipeline);
+     when(mockScmServiceProvider.getPipeline(
+             pipeline2.getId().getProtobuf())).thenReturn(pipeline2);
+ 
+     // Open 5 containers on pipeline 1
+     containerIDs = new LinkedList<>();
+     cpw = new LinkedList<>();
+     for (long i = 1L; i <= 5L; ++i) {
+       ContainerInfo containerInfo = new ContainerInfo.Builder()
+               .setContainerID(i)
+               .setReplicationConfig(
+                       new RatisReplicationConfig(ReplicationFactor.ONE))
+               .setState(LifeCycleState.OPEN)
+               .setOwner("test")
+               .setPipelineID(pipeline.getId())
+               .build();
+       ContainerWithPipeline containerWithPipeline =
+               new ContainerWithPipeline(containerInfo, pipeline);
+       when(mockScmServiceProvider.getContainerWithPipeline(i))
+               .thenReturn(containerWithPipeline);
+       containerIDs.add(i);
+       cpw.add(containerWithPipeline);
+     }
+ 
+     // Open 5 containers on pipeline 2
+     for (long i = 6L; i <= 10L; ++i) {
+       ContainerInfo containerInfo = new ContainerInfo.Builder()
+               .setContainerID(i)
+               .setReplicationConfig(
+                       new RatisReplicationConfig(ReplicationFactor.ONE))
+               .setState(LifeCycleState.OPEN)
+               .setOwner("test")
+               .setPipelineID(pipeline2.getId())
+               .build();
+       ContainerWithPipeline containerWithPipeline =
+               new ContainerWithPipeline(containerInfo, pipeline2);
+       when(mockScmServiceProvider.getContainerWithPipeline(i))
+               .thenReturn(containerWithPipeline);
+       containerIDs.add(i);
+       cpw.add(containerWithPipeline);
+     }
+ 
+     when(mockScmServiceProvider
+             .getExistContainerWithPipelinesInBatch(containerIDs))
+             .thenReturn(cpw);
+ 
+     reconUtilsMock = mock(ReconUtils.class);
+     HttpURLConnection urlConnectionMock = mock(HttpURLConnection.class);
+     when(urlConnectionMock.getResponseCode())
+             .thenReturn(HttpServletResponse.SC_OK);
+     when(reconUtilsMock.makeHttpCall(any(URLConnectionFactory.class),
+             anyString(), anyBoolean())).thenReturn(urlConnectionMock);
+ 
+     ReconTestInjector reconTestInjector =
+             new ReconTestInjector.Builder(temporaryFolder)
+                     .withReconSqlDb()
+                     .withReconOm(reconOMMetadataManager)
+                     .withOmServiceProvider(
+                             mock(OzoneManagerServiceProviderImpl.class))
+                     .addBinding(StorageContainerServiceProvider.class,
+                             mockScmServiceProvider)
+                     .addBinding(OzoneStorageContainerManager.class,
+                             ReconStorageContainerManagerFacade.class)
+                     .withContainerDB()
+                     .addBinding(NodeEndpoint.class)
+                     .addBinding(MetricsServiceProviderFactory.class)
+                     .addBinding(ContainerHealthSchemaManager.class)
+                     .addBinding(ReconUtils.class, reconUtilsMock)
+                     .addBinding(StorageContainerLocationProtocol.class,
+                             mockScmClient)
+                     .build();
+ 
+     nodeEndpoint = reconTestInjector.getInstance(NodeEndpoint.class);
+     reconScm = (ReconStorageContainerManagerFacade)
+             reconTestInjector.getInstance(OzoneStorageContainerManager.class);
+   }
+ 
+   @Before
+   public void setUp() throws Exception {
+     // The following setup runs only once
+     if (!isSetupDone) {
+       initializeInjector();
+       isSetupDone = true;
+     }
+     datanodeId = datanodeDetails.getUuid().toString();
+ 
+     // initialize container report
+     builder = ContainerReportsProto.newBuilder();
+     for (long i = 1L; i <= 10L; i++) {
+       builder.addReports(
+               ContainerReplicaProto.newBuilder()
+                       .setContainerID(i)
+                       .setState(ContainerReplicaProto.State.OPEN)
+                       .setOriginNodeId(datanodeId)
+                       .build()
+       );
+     }
+     containerReportsProto = builder.build();
+ 
+     UUID pipelineUuid = UUID.fromString(pipelineId);
+     HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
+             .setMostSigBits(pipelineUuid.getMostSignificantBits())
+             .setLeastSigBits(pipelineUuid.getLeastSignificantBits())
+             .build();
+ 
+     UUID pipelineUuid2 = UUID.fromString(pipelineId2);
+     HddsProtos.UUID uuid1282 = HddsProtos.UUID.newBuilder()
+             .setMostSigBits(pipelineUuid2.getMostSignificantBits())
+             .setLeastSigBits(pipelineUuid2.getLeastSignificantBits())
+             .build();
+ 
+     PipelineReport pipelineReport = PipelineReport.newBuilder()
+             .setPipelineID(
+                     PipelineID.newBuilder()
+                             .setId(pipelineId)
+                             .setUuid128(uuid128)
+                             .build())
+             .setIsLeader(true)
+             .build();
+ 
+     PipelineReport pipelineReport2 = PipelineReport.newBuilder()
+             .setPipelineID(
+                     PipelineID
+                             .newBuilder()
+                             .setId(pipelineId2)
+                             .setUuid128(uuid1282)
+                             .build())
+             .setIsLeader(false)
+             .build();
+ 
+     pipelineReportsProto =
+             PipelineReportsProto.newBuilder()
+                     .addPipelineReport(pipelineReport)
+                     .addPipelineReport(pipelineReport2)
+                     .build();
+ 
+     DatanodeDetailsProto datanodeDetailsProto =
+             DatanodeDetailsProto.newBuilder()
+                     .setHostName(HOST1)
+                     .setUuid(datanodeId)
+                     .setIpAddress(IP1)
+                     .build();
+ 
+     extendedDatanodeDetailsProto =
+             HddsProtos.ExtendedDatanodeDetailsProto.newBuilder()
+                     .setDatanodeDetails(datanodeDetailsProto)
+                     .setVersion("0.6.0")
+                     .setSetupTime(1596347628802L)
+                     .setBuildDate("2020-08-01T08:50Z")
+                     .setRevision("3346f493fa1690358add7bb9f3e5b52545993f36")
+                     .build();
+ 
+     StorageReportProto storageReportProto1 =
+             StorageReportProto.newBuilder()
+                     .setStorageType(StorageTypeProto.DISK)
+                     .setStorageLocation("/disk1")
+                     .setScmUsed(10 * OzoneConsts.GB)
+                     .setRemaining(90 * OzoneConsts.GB)
+                     .setCapacity(100 * OzoneConsts.GB)
+                     .setStorageUuid(UUID.randomUUID().toString())
+                     .setFailed(false).build();
+ 
+     StorageReportProto storageReportProto2 =
+             StorageReportProto.newBuilder()
+                     .setStorageType(StorageTypeProto.DISK)
+                     .setStorageLocation("/disk2")
+                     .setScmUsed(10 * OzoneConsts.GB)
+                     .setRemaining(90 * OzoneConsts.GB)
+                     .setCapacity(100 * OzoneConsts.GB)
+                     .setStorageUuid(UUID.randomUUID().toString())
+                     .setFailed(false).build();
+ 
+     nodeReportProto =
+             NodeReportProto.newBuilder()
+                     .addStorageReport(storageReportProto1)
+                     .addStorageReport(storageReportProto2).build();
+ 
+     try {
+       reconScm.getDatanodeProtocolServer()
+               .register(extendedDatanodeDetailsProto, nodeReportProto,
 -                      containerReportsProto, pipelineReportsProto);
++                      containerReportsProto, pipelineReportsProto,
++                  defaultLayoutVersionProto());
+       // Process all events in the event queue
+       reconScm.getEventQueue().processAll(1000);
+     } catch (Exception ex) {
+       Assert.fail(ex.getMessage());
+     }
+   }
+ 
+   @Test
+   public void testOpenContainerCount() throws Exception {
+     // In case of pipeline doesn't exist
+     waitAndCheckConditionAfterHeartbeat(() -> {
+ 
+       DatanodeMetadata datanodeMetadata1 = getDatanodeMetadata();
+       return datanodeMetadata1.getContainers() == 10
+               && datanodeMetadata1.getPipelines().size() == 2;
+     });
+ 
+     DatanodeMetadata datanodeMetadata = getDatanodeMetadata();
+ 
+     int expectedCnt = datanodeMetadata.getOpenContainers();
+ 
+     // check if open container's count decrement according
+     for (long id = 1L; id <= 10L; ++id) {
+       --expectedCnt;
+       closeContainer(id);
+       DatanodeMetadata metadata = getDatanodeMetadata();
+       Assert.assertEquals(expectedCnt, metadata.getOpenContainers());
+     }
+   }
+ 
+   private DatanodeMetadata getDatanodeMetadata() {
+     Response response = nodeEndpoint.getDatanodes();
+     DatanodesResponse datanodesResponse =
+             (DatanodesResponse) response.getEntity();
+ 
+     DatanodeMetadata datanodeMetadata =
+             datanodesResponse.getDatanodes().stream().filter(metadata ->
+                     metadata.getHostname().equals("host1.datanode"))
+                     .findFirst().orElse(null);
+     return datanodeMetadata;
+   }
+ 
+   private void closeContainer(long containerID) throws IOException {
+ 
+     if (containerID >= 1L && containerID <= 5L) {
+       ContainerInfo closedContainer = new ContainerInfo.Builder()
+               .setContainerID(containerID)
+               .setReplicationConfig(
+                       new RatisReplicationConfig(ReplicationFactor.ONE))
+               .setState(LifeCycleState.CLOSED)
+               .setOwner("test")
+               .setPipelineID(pipeline.getId())
+               .build();
+       ContainerWithPipeline containerWithPipeline =
+               new ContainerWithPipeline(closedContainer, pipeline);
+       when(mockScmServiceProvider.getContainerWithPipeline(containerID))
+               .thenReturn(containerWithPipeline);
+       cpw.set((int) containerID - 1, containerWithPipeline);
+     } else if (containerID >= 6L && containerID <= 10L) {
+       ContainerInfo closedContainer = new ContainerInfo.Builder()
+               .setContainerID(containerID)
+               .setReplicationConfig(
+                       new RatisReplicationConfig(ReplicationFactor.ONE))
+               .setState(LifeCycleState.CLOSED)
+               .setOwner("test")
+               .setPipelineID(pipeline2.getId())
+               .build();
+       ContainerWithPipeline containerWithPipeline =
+               new ContainerWithPipeline(closedContainer, pipeline2);
+       when(mockScmServiceProvider.getContainerWithPipeline(containerID))
+               .thenReturn(containerWithPipeline);
+       cpw.set((int) containerID - 1, containerWithPipeline);
+     }
+     when(mockScmServiceProvider
+             .getExistContainerWithPipelinesInBatch(containerIDs))
+             .thenReturn(cpw);
+     updateContainerReport(containerID);
+   }
+ 
+   private void updateContainerReport(long containerId) {
+     containerReportsProto = builder.setReports((int) containerId - 1,
+             ContainerReplicaProto.newBuilder()
+                     .setContainerID(containerId)
+                     .setState(ContainerReplicaProto.State.CLOSED)
+                     .setOriginNodeId(datanodeId)
+                     .build())
+             .build();
+     try {
+       reconScm.getDatanodeProtocolServer()
+               .register(extendedDatanodeDetailsProto, nodeReportProto,
 -                      containerReportsProto, pipelineReportsProto);
++                      containerReportsProto, pipelineReportsProto,
++                  defaultLayoutVersionProto());
+       // Process all events in the event queue
+       reconScm.getEventQueue().processAll(1000);
+     } catch (Exception ex) {
+       Assert.fail(ex.getMessage());
+     }
+   }
+ 
+   private void waitAndCheckConditionAfterHeartbeat(Callable<Boolean> check)
+           throws Exception {
+     // if container report is processed first, and pipeline does not exist
+     // then container is not added until the next container report is processed
+     SCMHeartbeatRequestProto heartbeatRequestProto =
+             SCMHeartbeatRequestProto.newBuilder()
+                     .setContainerReport(containerReportsProto)
+                     .setDatanodeDetails(extendedDatanodeDetailsProto
+                             .getDatanodeDetails())
+                     .build();
+ 
+     reconScm.getDatanodeProtocolServer().sendHeartbeat(heartbeatRequestProto);
+     LambdaTestUtils.await(30000, 1000, check);
+   }
+ }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org