You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by av...@apache.org on 2021/07/02 06:36:01 UTC
[ozone] 01/01: Merge remote-tracking branch 'ssh-upstream/master'
into ssh-upstream-upgrade-branch
This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch HDDS-3698-nonrolling-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 039afb37ce6fb4522e6d32de974c0cc96d868592
Merge: 4506883 1d8f972
Author: Aravindan Vijayan <av...@cloudera.com>
AuthorDate: Thu Jul 1 23:35:16 2021 -0700
Merge remote-tracking branch 'ssh-upstream/master' into ssh-upstream-upgrade-branch
.github/workflows/post-commit.yml | 8 +-
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 6 +-
.../hadoop/hdds/fs/AbstractSpaceUsageSource.java | 7 +-
.../hadoop/hdds/fs/DedicatedDiskSpaceUsage.java | 8 +-
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 86 +----
.../apache/hadoop/hdds/scm/client/ScmClient.java | 21 +
.../hadoop/hdds/scm/container/ContainerInfo.java | 5 +
.../hadoop/hdds/scm/ha/SCMHAConfiguration.java | 4 +-
.../protocol/StorageContainerLocationProtocol.java | 21 +
.../hadoop/hdds/security/x509/crl/CRLStatus.java | 87 +++++
.../org/apache/hadoop/ozone/audit/SCMAction.java | 3 +
.../common/src/main/resources/ozone-default.xml | 141 +------
.../server/ratis/ContainerStateMachine.java | 74 ++--
.../common/volume/StorageVolumeChecker.java | 20 +-
.../container/stream/DirectoryServerSource.java | 6 +-
.../container/stream/DirstreamClientHandler.java | 5 +
.../container/stream/DirstreamServerHandler.java | 43 ++-
.../ozone/container/stream/StreamingClient.java | 28 +-
.../container/stream/StreamingException.java} | 27 +-
.../ozone/container/stream/StreamingServer.java | 68 ++--
.../stream/TestDirstreamClientHandler.java | 139 +++++++
.../container/stream/TestStreamingServer.java | 48 +++
.../ozone/container/stream/package-info.java} | 43 +--
hadoop-hdds/docs/content/interface/O3fs.md | 8 +
hadoop-hdds/docs/content/interface/O3fs.zh.md | 8 +
...inerLocationProtocolClientSideTranslatorPB.java | 73 ++++
.../SCMBlockLocationFailoverProxyProvider.java | 8 +-
.../SCMSecurityProtocolFailoverProxyProvider.java | 8 +-
.../scm/update/client/CRLClientUpdateHandler.java | 2 +-
.../certificate/authority/CertificateStore.java | 6 +
.../authority/PKIProfiles/DefaultProfile.java | 2 +-
.../hadoop/hdds/security/x509/crl/CRLCodec.java | 0
.../hadoop/hdds/security/x509/crl/CRLInfo.java | 0
.../hdds/security/x509/crl/CRLInfoCodec.java | 0
.../hadoop/hdds/server/events/EventExecutor.java | 5 +
.../hadoop/hdds/server/events/EventQueue.java | 44 ++-
...dExecutor.java => FixedThreadPoolExecutor.java} | 42 +-
.../hdds/server/events/SingleThreadExecutor.java | 9 +
.../hadoop/hdds/server/http/ProfileServlet.java | 2 +-
.../apache/hadoop/hdds/utils/HddsServerUtil.java | 9 +
.../x509/certificate/authority/MockCAStore.java | 12 +
.../hadoop/hdds/server/events/TestEventQueue.java | 62 ++-
.../src/main/proto/ScmAdminProtocol.proto | 50 ++-
hadoop-hdds/interface-client/pom.xml | 13 -
hadoop-hdds/interface-server/pom.xml | 2 +
.../src/main/proto/SCMUpdateProtocol.proto | 0
.../hadoop/hdds/scm/SCMCommonPlacementPolicy.java | 31 +-
.../block/DatanodeDeletedBlockTransactions.java | 6 -
.../hdds/scm/block/DeletedBlockLogImplV2.java | 1 +
.../scm/block/DeletedBlockLogStateManagerImpl.java | 26 +-
.../hdds/scm/block/PendingDeleteStatusList.java | 85 ----
.../hdds/scm/block/SCMBlockDeletingService.java | 24 --
.../hdds/scm/container/ContainerReportHandler.java | 38 --
.../scm/container/ContainerStateManagerImpl.java | 2 +-
.../hdds/scm/container/ReplicationManager.java | 11 +-
.../scm/container/balancer/ContainerBalancer.java | 101 +++--
.../balancer/ContainerBalancerConfiguration.java | 28 ++
.../hdds/scm/crl/CRLStatusReportHandler.java | 87 +++++
.../apache/hadoop/hdds/scm/crl/package-info.java} | 43 +--
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 19 +-
.../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 8 +-
.../apache/hadoop/hdds/scm/node/DatanodeInfo.java | 14 +
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 15 +-
.../scm/pipeline/PipelineStateManagerV2Impl.java | 4 +-
...inerLocationProtocolServerSideTranslatorPB.java | 70 ++++
.../scm/server/OzoneStorageContainerManager.java | 3 +
.../hdds/scm/server/SCMBlockProtocolServer.java | 2 +
.../hadoop/hdds/scm/server/SCMCertStore.java | 20 +-
.../hdds/scm/server/SCMClientProtocolServer.java | 65 ++++
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 15 +-
.../hdds/scm/server/SCMDatanodeProtocolServer.java | 3 +
.../hdds/scm/server/StorageContainerManager.java | 90 +++--
.../java/org/apache/hadoop/hdds/scm/TestUtils.java | 113 ++++--
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 96 ++++-
.../hadoop/hdds/scm/container/MockNodeManager.java | 8 +
.../container/balancer/TestContainerBalancer.java | 20 +-
.../algorithms/TestContainerPlacementFactory.java | 11 +
.../TestSCMContainerPlacementCapacity.java | 16 +-
.../TestSCMContainerPlacementRackAware.java | 22 +-
.../TestSCMContainerPlacementRandom.java | 19 +-
.../hdds/scm/crl/TestCRLStatusReportHandler.java | 137 +++++++
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 49 ++-
.../hdds/scm/node/TestNodeReportHandler.java | 21 +-
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 25 +-
.../hdds/scm/node/TestSCMNodeStorageStatMap.java | 11 +-
.../hadoop/hdds/scm/node/TestStatisticsUpdate.java | 14 +-
.../scm/pipeline/TestPipelinePlacementPolicy.java | 33 +-
.../hdds/scm/update/server/MockCRLStore.java | 6 +-
.../ozone/container/common/TestEndPoint.java | 25 +-
.../hdds/scm/cli/ContainerBalancerCommands.java | 108 ++++++
.../scm/cli/ContainerBalancerStartSubcommand.java | 66 ++++
.../cli/ContainerBalancerStatusSubcommand.java} | 38 +-
.../scm/cli/ContainerBalancerStopSubcommand.java} | 35 +-
.../hdds/scm/cli/ContainerOperationClient.java | 20 +
.../datanode/TestContainerBalancerSubCommand.java | 141 +++++++
.../hadoop/ozone/client/MockOmTransport.java | 12 +
.../hadoop/ozone/client/TestOzoneClient.java | 34 +-
.../src/main/compose/compatibility/docker-config | 1 +
.../dist/src/main/compose/ozone-csi/docker-config | 1 +
.../dist/src/main/compose/ozone-ha/docker-config | 1 +
.../dist/src/main/compose/ozone-mr/common-config | 1 +
.../dist/src/main/compose/ozone-mr/test.sh | 6 +-
.../src/main/compose/ozone-om-ha/docker-config | 1 +
.../src/main/compose/ozone-topology/docker-config | 1 +
.../dist/src/main/compose/ozone/docker-config | 1 +
.../src/main/compose/ozones3-haproxy/docker-config | 1 +
.../src/main/compose/ozonesecure-ha/docker-config | 1 +
.../src/main/compose/ozonesecure-mr/docker-config | 2 +
.../dist/src/main/compose/restart/docker-config | 1 +
hadoop-ozone/dist/src/main/compose/test-all.sh | 8 +-
.../compose/upgrade/compose/non-ha/docker-config | 3 +-
hadoop-ozone/dist/src/main/compose/upgrade/test.sh | 3 +
.../dist/src/main/compose/xcompat/docker-config | 1 +
.../src/main/smoketest/ozonefs/hadoopo3fs.robot | 2 +
.../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 3 +
.../ozone/TestContainerBalancerOperations.java | 112 ++++++
.../commandhandler/TestBlockDeletion.java | 47 ---
.../commandhandler/TestCloseContainerHandler.java | 4 +
.../commandhandler/TestDeleteContainerHandler.java | 4 +
.../TestDatanodeHddsVolumeFailureDetection.java | 4 +
.../TestDatanodeHddsVolumeFailureToleration.java | 4 +
.../hadoop/ozone/fsck/TestContainerMapper.java | 5 +
.../src/main/proto/OmClientProtocol.proto | 1 +
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 2 +-
.../java/org/apache/hadoop/ozone/om/OMMetrics.java | 1 -
.../apache/hadoop/ozone/om/TrashPolicyOzone.java | 20 +-
.../request/bucket/OMBucketSetPropertyRequest.java | 19 +-
.../bucket/TestOMBucketSetPropertyRequest.java | 10 +
hadoop-ozone/ozonefs-shaded/pom.xml | 3 +
.../hadoop/ozone/recon/api/NodeEndpoint.java | 17 +-
.../ozone/recon/api/types/DatanodeMetadata.java | 15 +
.../ozone/recon/scm/ReconContainerManager.java | 18 +
.../scm/ReconStorageContainerManagerFacade.java | 6 +
.../webapps/recon/ozone-recon-web/api/db.json | 20 +
.../src/views/datanodes/datanodes.tsx | 11 +
.../hadoop/ozone/recon/api/TestEndpoints.java | 3 +-
.../ozone/recon/api/TestOpenContainerCount.java | 427 +++++++++++++++++++++
.../ozone/audit/parser/common/DatabaseHelper.java | 7 +-
.../hadoop/ozone/freon/StreamingGenerator.java | 23 +-
139 files changed, 3046 insertions(+), 929 deletions(-)
diff --cc hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 60c3931,31c4472..1701e4c
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@@ -36,10 -36,9 +36,11 @@@ import java.util.EnumSet
import java.util.Collections;
import java.util.List;
import java.util.Map;
+ import java.util.Optional;
import java.util.Set;
+import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;
+
/**
* ContainerLocationProtocol is used by an HDFS node to find the set of nodes
* that currently host a container.
diff --cc hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 6313579,d67dce9..3728a49
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@@ -69,9 -69,9 +69,11 @@@ message ScmContainerLocationRequest
optional DatanodeUsageInfoRequestProto DatanodeUsageInfoRequest = 30;
optional GetExistContainerWithPipelinesInBatchRequestProto getExistContainerWithPipelinesInBatchRequest = 31;
optional GetContainerTokenRequestProto containerTokenRequest = 32;
- optional FinalizeScmUpgradeRequestProto finalizeScmUpgradeRequest = 33;
- optional QueryUpgradeFinalizationProgressRequestProto
- queryUpgradeFinalizationProgressRequest = 34;
+ optional StartContainerBalancerRequestProto startContainerBalancerRequest = 33;
+ optional StopContainerBalancerRequestProto stopContainerBalancerRequest = 34;
+ optional ContainerBalancerStatusRequestProto containerBalancerStatusRequest = 35;
++ optional FinalizeScmUpgradeRequestProto finalizeScmUpgradeRequest = 36;
++ optional QueryUpgradeFinalizationProgressRequestProto queryUpgradeFinalizationProgressRequest = 37;
}
message ScmContainerLocationResponse {
@@@ -112,9 -112,9 +114,11 @@@
optional DatanodeUsageInfoResponseProto DatanodeUsageInfoResponse = 30;
optional GetExistContainerWithPipelinesInBatchResponseProto getExistContainerWithPipelinesInBatchResponse = 31;
optional GetContainerTokenResponseProto containerTokenResponse = 32;
- optional FinalizeScmUpgradeResponseProto finalizeScmUpgradeResponse = 33;
- optional QueryUpgradeFinalizationProgressResponseProto
- queryUpgradeFinalizationProgressResponse = 34;
+ optional StartContainerBalancerResponseProto startContainerBalancerResponse = 33;
+ optional StopContainerBalancerResponseProto stopContainerBalancerResponse = 34;
+ optional ContainerBalancerStatusResponseProto containerBalancerStatusResponse = 35;
++ optional FinalizeScmUpgradeResponseProto finalizeScmUpgradeResponse = 36;
++ optional QueryUpgradeFinalizationProgressResponseProto queryUpgradeFinalizationProgressResponse = 37;
enum Status {
OK = 1;
@@@ -153,8 -153,9 +157,11 @@@ enum Type
DatanodeUsageInfo = 25;
GetExistContainerWithPipelinesInBatch = 26;
GetContainerToken = 27;
- FinalizeScmUpgrade = 28;
- QueryUpgradeFinalizationProgress = 29;
+ StartContainerBalancer = 28;
+ StopContainerBalancer = 29;
+ GetContainerBalancerStatus = 30;
++ FinalizeScmUpgrade = 31;
++ QueryUpgradeFinalizationProgress = 32;
}
/**
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 6f9030c,a374766..d9872ce
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@@ -671,37 -675,64 +677,96 @@@ public class SCMClientProtocolServer im
}
@Override
+ public StatusAndMessages finalizeScmUpgrade(String upgradeClientID) throws
+ IOException {
+ // check admin authorization
+ try {
+ getScm().checkAdminAccess(getRemoteUser());
+ } catch (IOException e) {
+ LOG.error("Authorization failed for finalize scm upgrade", e);
+ throw e;
+ }
+ return scm.finalizeUpgrade(upgradeClientID);
+ }
+
+ @Override
+ public StatusAndMessages queryUpgradeFinalizationProgress(
+ String upgradeClientID, boolean force, boolean readonly)
+ throws IOException {
+ if (!readonly) {
+ // check admin authorization
+ try {
+ getScm().checkAdminAccess(getRemoteUser());
+ } catch (IOException e) {
+ LOG.error("Authorization failed for query scm upgrade finalization " +
+ "progress", e);
+ throw e;
+ }
+ }
+
+ return scm.queryUpgradeFinalizationProgress(upgradeClientID, force,
+ readonly);
+ }
+
++ @Override
+ public boolean startContainerBalancer(Optional<Double> threshold,
+ Optional<Integer> idleiterations,
+ Optional<Integer> maxDatanodesToBalance,
+ Optional<Long> maxSizeToMoveInGB) throws IOException{
+ getScm().checkAdminAccess(getRemoteUser());
+ ContainerBalancerConfiguration cbc = new ContainerBalancerConfiguration();
+ if (threshold.isPresent()) {
+ double tsd = threshold.get();
+ Preconditions.checkState(tsd >= 0.0D && tsd < 1.0D,
+ "threshold should to be specified in range [0.0, 1.0).");
+ cbc.setThreshold(tsd);
+ }
+ if (maxSizeToMoveInGB.isPresent()) {
+ long mstm = maxSizeToMoveInGB.get();
+ Preconditions.checkState(mstm > 0,
+ "maxSizeToMoveInGB must be positive.");
+ cbc.setMaxSizeToMove(mstm * OzoneConsts.GB);
+ }
+ if (maxDatanodesToBalance.isPresent()) {
+ int mdtb = maxDatanodesToBalance.get();
+ Preconditions.checkState(mdtb > 0,
+ "maxDatanodesToBalance must be positive.");
+ cbc.setMaxDatanodesToBalance(mdtb);
+ }
+ if (idleiterations.isPresent()) {
+ int idi = idleiterations.get();
+ Preconditions.checkState(idi > 0 || idi == -1,
+ "idleiterations must be positive or" +
+ " -1(infinitly run container balancer).");
+ cbc.setIdleIteration(idi);
+ }
+
+ boolean isStartedSuccessfully = scm.getContainerBalancer().start(cbc);
+ if (isStartedSuccessfully) {
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ SCMAction.START_CONTAINER_BALANCER, null));
+ } else {
+ AUDIT.logWriteFailure(buildAuditMessageForSuccess(
+ SCMAction.START_CONTAINER_BALANCER, null));
+ }
+ return isStartedSuccessfully;
+ }
+
+ @Override
+ public void stopContainerBalancer() throws IOException {
+ getScm().checkAdminAccess(getRemoteUser());
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ SCMAction.STOP_CONTAINER_BALANCER, null));
+ scm.getContainerBalancer().stop();
+ }
+
+ @Override
+ public boolean getContainerBalancerStatus() {
+ AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+ SCMAction.GET_CONTAINER_BALANCER_STATUS, null));
+ return scm.getContainerBalancer().isBalancerRunning();
+ }
+
/**
* Get Datanode usage info such as capacity, SCMUsed, and remaining by ip
* or uuid.
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 3439933,5a69121..745be9d
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@@ -20,8 -20,9 +20,10 @@@ package org.apache.hadoop.hdds.scm.serv
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 0f8ea75,b75112c..109e5e4
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@@ -375,19 -390,13 +404,17 @@@ public final class StorageContainerMana
pipelineManager, containerManager);
StartDatanodeAdminHandler datanodeStartAdminHandler =
new StartDatanodeAdminHandler(scmNodeManager, pipelineManager);
- NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
- new NonHealthyToHealthyNodeHandler(configuration, serviceManager);
+ ReadOnlyHealthyToHealthyNodeHandler readOnlyHealthyToHealthyNodeHandler =
- new ReadOnlyHealthyToHealthyNodeHandler(conf, serviceManager);
++ new ReadOnlyHealthyToHealthyNodeHandler(configuration, serviceManager);
+ HealthyReadOnlyNodeHandler
+ healthyReadOnlyNodeHandler =
+ new HealthyReadOnlyNodeHandler(scmNodeManager,
- pipelineManager, conf);
++ pipelineManager, configuration);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
- PendingDeleteHandler pendingDeleteHandler =
- new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
ContainerReportHandler containerReportHandler =
- new ContainerReportHandler(
- scmNodeManager, containerManager, scmContext, conf);
+ new ContainerReportHandler(scmNodeManager, containerManager,
+ scmContext, configuration);
IncrementalContainerReportHandler incrementalContainerReportHandler =
new IncrementalContainerReportHandler(
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index af0538a,4d364ef..db145c7
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@@ -226,16 -232,23 +232,32 @@@ public final class TestUtils
StorageTypeProto.DISK);
}
+ /**
+ * Generates random metadata storage report.
+ *
+ * @param path path of the storage
+ *
+ * @return MetadataStorageReportProto
+ */
+ public static MetadataStorageReportProto getRandomMetadataStorageReport(
+ String path) {
+ return createMetadataStorageReport(path,
+ random.nextInt(1000),
+ random.nextInt(500),
+ random.nextInt(500),
+ StorageTypeProto.DISK);
+ }
+
public static StorageReportProto createStorageReport(UUID nodeId, String path,
+ long capacity) {
+ return createStorageReport(nodeId, path,
+ capacity,
+ 0,
+ capacity,
+ StorageTypeProto.DISK);
+ }
+
+ public static StorageReportProto createStorageReport(UUID nodeId, String path,
long capacity, long used, long remaining, StorageTypeProto type) {
return createStorageReport(nodeId, path, capacity, used, remaining,
type, false);
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
index 33272bd,f1ff0d9..411e59f
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
@@@ -34,9 -35,10 +35,11 @@@ import org.apache.hadoop.hdds.scm.node.
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
import org.junit.Assert;
import org.junit.Test;
+
+ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
import static org.mockito.Matchers.anyObject;
import org.mockito.Mockito;
import static org.mockito.Mockito.when;
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
index 934d986,d91f733..436a1e8
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
@@@ -16,9 -16,9 +16,11 @@@
*/
package org.apache.hadoop.hdds.scm.node;
+import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
+
import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.List;
import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@@ -52,10 -52,11 +55,12 @@@ public class TestNodeReportHandler impl
private static final Logger LOG = LoggerFactory
.getLogger(TestNodeReportHandler.class);
private NodeReportHandler nodeReportHandler;
+ private HDDSLayoutVersionManager versionManager;
private SCMNodeManager nodeManager;
private String storagePath = GenericTestUtils.getRandomizedTempPath()
- .concat("/" + UUID.randomUUID().toString());
+ .concat("/data-" + UUID.randomUUID().toString());
+ private String metaStoragePath = GenericTestUtils.getRandomizedTempPath()
+ .concat("/metadata-" + UUID.randomUUID().toString());
@Before
public void resetEventCollector() throws IOException {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 80886cd,17c6a0f..4554b53
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@@ -231,241 -176,6 +231,242 @@@ public class TestSCMNodeManager
}
/**
+ * Tests that node manager handles layout version changes from heartbeats
+ * correctly.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void testScmLayoutOnHeartbeat() throws Exception {
+ OzoneConfiguration conf = getConf();
+ conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
+ 1, TimeUnit.DAYS);
+
+ try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+ Assert.assertTrue(scm.checkLeader());
+ // Register 2 nodes correctly.
+ // These will be used with a faulty node to test pipeline creation.
+ DatanodeDetails goodNode1 = registerWithCapacity(nodeManager);
+ DatanodeDetails goodNode2 = registerWithCapacity(nodeManager);
+
+ scm.exitSafeMode();
+
+ assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
+ nodeManager, SMALLER_MLV_LAYOUT_PROTO);
+ assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
+ nodeManager, LARGER_MLV_SLV_LAYOUT_PROTO);
+ assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
+ nodeManager, SMALLER_MLV_SLV_LAYOUT_PROTO);
+ assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
+ nodeManager, LARGER_SLV_LAYOUT_PROTO);
+ }
+ }
+
+ /**
+ * Create {@link DatanodeDetails} to register with {@code nodeManager}, and
+ * provide the datanode maximum capacity so that space used does not block
+ * pipeline creation.
+ * @return The created {@link DatanodeDetails}.
+ */
+ private DatanodeDetails registerWithCapacity(SCMNodeManager nodeManager) {
+ return registerWithCapacity(nodeManager,
+ UpgradeUtils.defaultLayoutVersionProto(), success);
+ }
+
+ /**
+ * Create {@link DatanodeDetails} to register with {@code nodeManager}, and
+ * provide the datanode maximum capacity so that space used does not block
+ * pipeline creation. Also check that the result of registering matched
+ * {@code expectedResult}.
+ * @return The created {@link DatanodeDetails}.
+ */
+ private DatanodeDetails registerWithCapacity(SCMNodeManager nodeManager,
+ LayoutVersionProto layout, ErrorCode expectedResult) {
+ DatanodeDetails details = MockDatanodeDetails.randomDatanodeDetails();
+ StorageReportProto storageReport =
+ TestUtils.createStorageReport(details.getUuid(),
+ details.getNetworkFullPath(), Long.MAX_VALUE);
+ RegisteredCommand cmd = nodeManager.register(
+ MockDatanodeDetails.randomDatanodeDetails(),
- TestUtils.createNodeReport(storageReport),
++ TestUtils.createNodeReport(Arrays.asList(storageReport),
++ Collections.emptyList()),
+ getRandomPipelineReports(), layout);
+
+ Assert.assertEquals(expectedResult, cmd.getError());
+ return cmd.getDatanode();
+ }
+
+ private void assertPipelineClosedAfterLayoutHeartbeat(
+ DatanodeDetails originalNode1, DatanodeDetails originalNode2,
+ SCMNodeManager nodeManager, LayoutVersionProto layout) throws Exception {
+
+ List<DatanodeDetails> originalNodes =
+ Arrays.asList(originalNode1, originalNode2);
+
+ // Initial condition: 2 healthy nodes registered.
+ assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2,
+ originalNodes);
+ assertPipelines(HddsProtos.ReplicationFactor.THREE,
+ count -> count == 0, new ArrayList<>());
+
+ // Even when safemode exit or new node addition trigger pipeline
+ // creation, they will fail with not enough healthy nodes for ratis 3
+ // pipeline. Therefore we do not have to worry about this create call
+ // failing due to datanodes reaching their maximum pipeline limit.
+ assertPipelineCreationFailsWithNotEnoughNodes(2);
+
+ // Register a new node correctly.
+ DatanodeDetails node = registerWithCapacity(nodeManager);
+
+ List<DatanodeDetails> allNodes = new ArrayList<>(originalNodes);
+ allNodes.add(node);
+
+ // Safemode exit and adding the new node should trigger pipeline creation.
+ assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 3,
+ allNodes);
+ assertPipelines(HddsProtos.ReplicationFactor.THREE, count -> count >= 1,
+ allNodes);
+
+ // node sends incorrect layout.
+ nodeManager.processHeartbeat(node, layout);
+
+ // Its pipelines should be closed then removed, meaning there is not
+ // enough nodes for factor 3 pipelines.
+ assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2,
+ originalNodes);
+ assertPipelines(HddsProtos.ReplicationFactor.THREE,
+ count -> count == 0, new ArrayList<>());
+
+ assertPipelineCreationFailsWithNotEnoughNodes(2);
+ }
+
+ /**
+ * Tests that node manager handles layout versions for newly registered nodes
+ * correctly.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws TimeoutException
+ */
+ @Test
+ public void testScmLayoutOnRegister()
+ throws Exception {
+
+ OzoneConfiguration conf = getConf();
+ conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
+ 1, TimeUnit.DAYS);
+
+ try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+ Assert.assertTrue(scm.checkLeader());
+ // Nodes with mismatched SLV cannot join the cluster.
+ registerWithCapacity(nodeManager,
+ LARGER_SLV_LAYOUT_PROTO, errorNodeNotPermitted);
+ registerWithCapacity(nodeManager,
+ SMALLER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted);
+ registerWithCapacity(nodeManager,
+ LARGER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted);
+ // Nodes with mismatched MLV can join, but should not be allowed in
+ // pipelines.
+ DatanodeDetails badMlvNode1 = registerWithCapacity(nodeManager,
+ SMALLER_MLV_LAYOUT_PROTO, success);
+ DatanodeDetails badMlvNode2 = registerWithCapacity(nodeManager,
+ SMALLER_MLV_LAYOUT_PROTO, success);
+ // This node has correct MLV and SLV, so it can join and be used in
+ // pipelines.
+ DatanodeDetails goodNode = registerWithCapacity(nodeManager,
+ CORRECT_LAYOUT_PROTO, success);
+
+ Assert.assertEquals(3, nodeManager.getAllNodes().size());
+
+ scm.exitSafeMode();
+
+ // SCM should auto create a factor 1 pipeline for the one healthy node.
+ // Still should not have enough healthy nodes for ratis 3 pipeline.
+ assertPipelines(HddsProtos.ReplicationFactor.ONE,
+ count -> count == 1,
+ Collections.singletonList(goodNode));
+ assertPipelines(HddsProtos.ReplicationFactor.THREE,
+ count -> count == 0,
+ new ArrayList<>());
+
+ // Even when safemode exit or new node addition trigger pipeline
+ // creation, they will fail with not enough healthy nodes for ratis 3
+ // pipeline. Therefore we do not have to worry about this create call
+ // failing due to datanodes reaching their maximum pipeline limit.
+ assertPipelineCreationFailsWithNotEnoughNodes(1);
+
+ // Heartbeat bad MLV nodes back to healthy.
+ nodeManager.processHeartbeat(badMlvNode1, CORRECT_LAYOUT_PROTO);
+ nodeManager.processHeartbeat(badMlvNode2, CORRECT_LAYOUT_PROTO);
+
+ // After moving out of healthy readonly, pipeline creation should be
+ // triggered.
+ assertPipelines(HddsProtos.ReplicationFactor.ONE,
+ count -> count == 3,
+ Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
+ assertPipelines(HddsProtos.ReplicationFactor.THREE,
+ count -> count >= 1,
+ Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
+ }
+ }
+
+ private void assertPipelineCreationFailsWithNotEnoughNodes(
+ int actualNodeCount) throws Exception {
+ try {
+ ReplicationConfig ratisThree =
+ ReplicationConfig.fromTypeAndFactor(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ scm.getPipelineManager().createPipeline(ratisThree);
+ Assert.fail("3 nodes should not have been found for a pipeline.");
+ } catch (SCMException ex) {
+ Assert.assertTrue(ex.getMessage().contains("Required 3. Found " +
+ actualNodeCount));
+ }
+ }
+
+ private void assertPipelines(HddsProtos.ReplicationFactor factor,
+ Predicate<Integer> countCheck, Collection<DatanodeDetails> allowedDNs)
+ throws Exception {
+
+ Set<String> allowedDnIds = allowedDNs.stream()
+ .map(DatanodeDetails::getUuidString)
+ .collect(Collectors.toSet());
+
+ RatisReplicationConfig replConfig = new RatisReplicationConfig(factor);
+
+ // Wait for the expected number of pipelines using allowed DNs.
+ GenericTestUtils.waitFor(() -> {
+ List<Pipeline> pipelines = scm.getPipelineManager()
+ .getPipelines(replConfig);
+ LOG.info("Found {} pipelines of type {} and factor {}.", pipelines.size(),
+ replConfig.getReplicationType(), replConfig.getReplicationFactor());
+ boolean success = countCheck.test(pipelines.size());
+
+ // If we have the correct number of pipelines, make sure that none of
+ // these pipelines use nodes outside of allowedDNs.
+ if (success) {
+ for (Pipeline pipeline: pipelines) {
+ for(DatanodeDetails pipelineDN: pipeline.getNodes()) {
+ // Do not wait for this condition to be true. Disallowed DNs should
+ // never be used once we have the expected number of pipelines.
+ if (!allowedDnIds.contains(pipelineDN.getUuidString())) {
+ String message = String.format("Pipeline %s used datanode %s " +
+ "which is not in the set of allowed datanodes: %s",
+ pipeline.getId().toString(), pipelineDN.getUuidString(),
+ allowedDnIds.toString());
+ Assert.fail(message);
+ }
+ }
+ }
+ }
+
+ return success;
+ }, 1000, 10000);
+ }
+
+ /**
* asserts that if we send no heartbeats node manager stays in safemode.
*
* @throws IOException
@@@ -1338,8 -937,9 +1339,9 @@@
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
StorageReportProto report = TestUtils
.createStorageReport(dnId, storagePath, capacity, used, free, null);
- nodeManager.register(dn, TestUtils.createNodeReport(report), null);
+ nodeManager.register(dn, TestUtils.createNodeReport(
+ Arrays.asList(report), Collections.emptyList()), null);
- nodeManager.processHeartbeat(dn);
+ nodeManager.processHeartbeat(dn, layoutInfo);
}
//TODO: wait for EventQueue to be processed
eventQueue.processAll(8000L);
@@@ -1386,17 -986,13 +1388,18 @@@
for (int x = 0; x < volumeCount; x++) {
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
reports.add(TestUtils
- .createStorageReport(dnId, storagePath, capacity,
- used, free, null, failed));
+ .createStorageReport(dnId, storagePath, capacity,
+ used, free, null, failed));
failed = !failed;
}
- nodeManager.register(dn, TestUtils.createNodeReport(reports), null);
+ nodeManager.register(dn, TestUtils.createNodeReport(reports,
+ Collections.emptyList()), null);
- nodeManager.processHeartbeat(dn);
+ LayoutVersionManager versionManager =
+ nodeManager.getLayoutVersionManager();
+ LayoutVersionProto layoutInfo = toLayoutVersionProto(
+ versionManager.getMetadataLayoutVersion(),
+ versionManager.getSoftwareLayoutVersion());
+ nodeManager.processHeartbeat(dn, layoutInfo);
//TODO: wait for EventQueue to be processed
eventQueue.processAll(8000L);
@@@ -1446,16 -1042,12 +1449,17 @@@
StorageReportProto report = TestUtils
.createStorageReport(dnId, storagePath, capacity, scmUsed,
remaining, null);
- NodeReportProto nodeReportProto = TestUtils.createNodeReport(report);
+ NodeReportProto nodeReportProto = TestUtils.createNodeReport(
+ Arrays.asList(report), Collections.emptyList());
nodeReportHandler.onMessage(
- new NodeReportFromDatanode(datanodeDetails, nodeReportProto),
- publisher);
- nodeManager.processHeartbeat(datanodeDetails);
+ new NodeReportFromDatanode(datanodeDetails, nodeReportProto),
+ publisher);
+ LayoutVersionManager versionManager =
+ nodeManager.getLayoutVersionManager();
+ LayoutVersionProto layoutInfo = toLayoutVersionProto(
+ versionManager.getMetadataLayoutVersion(),
+ versionManager.getSoftwareLayoutVersion());
+ nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
Thread.sleep(100);
}
@@@ -1759,15 -1342,11 +1765,20 @@@
.createStorageReport(dnId, storagePath, capacity, used,
remaining, null);
+ nodeManager.register(datanodeDetails, TestUtils.createNodeReport(
+ Arrays.asList(report), Collections.emptyList()),
+ TestUtils.getRandomPipelineReports());
- nodeManager.processHeartbeat(datanodeDetails);
+ LayoutVersionManager versionManager =
+ nodeManager.getLayoutVersionManager();
+ LayoutVersionProto layoutInfo = toLayoutVersionProto(
+ versionManager.getMetadataLayoutVersion(),
+ versionManager.getSoftwareLayoutVersion());
- nodeManager.register(datanodeDetails, TestUtils.createNodeReport(report),
++ nodeManager.register(datanodeDetails,
++ TestUtils.createNodeReport(Arrays.asList(report),
++ Collections.emptyList()),
+ TestUtils.getRandomPipelineReports(), layoutInfo);
+ nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
if (i == 5) {
nodeManager.setNodeOperationalState(datanodeDetails,
HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE);
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 1dcefa4,4a91502..85643d8
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@@ -275,10 -274,12 +277,13 @@@ public class TestEndPoint
SCMRegisteredResponseProto responseProto = rpcEndPoint.getEndPoint()
.register(nodeToRegister.getExtendedProtoBufMessage(), TestUtils
.createNodeReport(
- getStorageReports(nodeToRegister.getUuid())),
+ Arrays.asList(getStorageReports(
+ nodeToRegister.getUuid())),
+ Arrays.asList(getMetadataStorageReports(
+ nodeToRegister.getUuid()))),
TestUtils.getRandomContainerReports(10),
- TestUtils.getRandomPipelineReports());
+ TestUtils.getRandomPipelineReports(),
+ defaultLayoutVersionProto());
Assert.assertNotNull(responseProto);
Assert.assertEquals(nodeToRegister.getUuidString(),
responseProto.getDatanodeUUID());
@@@ -294,7 -295,14 +299,13 @@@
return TestUtils.createStorageReport(id, storagePath, 100, 10, 90, null);
}
+ private MetadataStorageReportProto getMetadataStorageReports(UUID id) {
+ String storagePath = testDir.getAbsolutePath() + "/metadata-" + id;
+ return TestUtils.createMetadataStorageReport(storagePath, 100, 10, 90,
+ null);
+ }
+
- private EndpointStateMachine registerTaskHelper(
- InetSocketAddress scmAddress,
+ private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
int rpcTimeout, boolean clearDatanodeDetails
) throws Exception {
OzoneConfiguration conf = SCMTestUtils.getConf();
diff --cc hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config
index a6d4482,69e7b91..1a7419c
--- a/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config
@@@ -14,13 -14,12 +14,14 @@@
# See the License for the specific language governing permissions and
# limitations under the License.
-CORE-SITE.XML_fs.ofs.impl=org.apache.hadoop.fs.ozone.RootedOzoneFileSystem
-CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
+OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata
+
+OZONE-SITE.XML_ozone.client.failover.max.attempts=6
+
OZONE-SITE.XML_ozone.om.address=om
OZONE-SITE.XML_ozone.om.http-address=om:9874
-
+ OZONE-SITE.XML_ozone.scm.container.size=1GB
+ OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB
OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s
OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1
OZONE-SITE.XML_ozone.scm.names=scm
diff --cc hadoop-ozone/dist/src/main/compose/upgrade/test.sh
index 2857c1a,0aa1419..a323310
--- a/hadoop-ozone/dist/src/main/compose/upgrade/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/upgrade/test.sh
@@@ -15,27 -15,18 +15,30 @@@
# See the License for the specific language governing permissions and
# limitations under the License.
-SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )
-ALL_RESULT_DIR="$SCRIPT_DIR/result"
-mkdir -p "$ALL_RESULT_DIR"
-rm "$ALL_RESULT_DIR"/* || true
-source "$SCRIPT_DIR/../testlib.sh"
+# Version that will be run using the local build.
+: "${OZONE_CURRENT_VERSION:=1.1.0}"
+export OZONE_CURRENT_VERSION
-tests=$(find_tests)
-cd "$SCRIPT_DIR"
+TEST_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )
+source "$TEST_DIR/testlib.sh"
+
+# Export variables needed by tests and ../testlib.sh.
+export TEST_DIR
+export COMPOSE_DIR="$TEST_DIR"
+ RESULT=0
+ run_test_scripts ${tests} || RESULT=$?
+
-generate_report "upgrade" "${ALL_RESULT_DIR}"
+RESULT_DIR="$ALL_RESULT_DIR" create_results_dir
+
+# Upgrade tests to be run.
+# Run all upgrades even if one fails.
+# Any failure will save a failing return code to $RESULT.
+set +e
+run_test manual-upgrade 0.5.0 1.1.0
+run_test non-rolling-upgrade 1.0.0 1.1.0
+set -e
+
+generate_report "upgrade" "$ALL_RESULT_DIR"
-exit ${RESULT}
+exit "$RESULT"
diff --cc hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenContainerCount.java
index 0000000,2647c9a..0e66553
mode 000000,100644..100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenContainerCount.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenContainerCount.java
@@@ -1,0 -1,424 +1,427 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.ozone.recon.api;
+
+ import org.apache.hadoop.ozone.OzoneConsts;
+ import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos
+ .ExtendedDatanodeDetailsProto;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder;
+ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+ import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+ import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+ import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+ import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory;
+ import org.apache.hadoop.ozone.recon.ReconTestInjector;
+ import org.apache.hadoop.ozone.recon.ReconUtils;
+ import org.apache.hadoop.ozone.recon.api.types.DatanodeMetadata;
+ import org.apache.hadoop.ozone.recon.api.types.DatanodesResponse;
+ import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+ import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+ import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
+ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+ import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+ import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
+ import org.apache.ozone.test.LambdaTestUtils;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Rule;
+ import org.junit.Test;
+ import org.junit.rules.TemporaryFolder;
+
+ import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
++import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.defaultLayoutVersionProto;
+ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
+ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
+ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
+ import static org.mockito.ArgumentMatchers.any;
+ import static org.mockito.ArgumentMatchers.anyBoolean;
+ import static org.mockito.ArgumentMatchers.anyString;
+ import static org.mockito.Mockito.mock;
+ import static org.mockito.Mockito.when;
+
+ import javax.servlet.http.HttpServletResponse;
+ import javax.ws.rs.core.Response;
+
+ import java.io.IOException;
+ import java.net.HttpURLConnection;
+ import java.util.*;
+ import java.util.concurrent.Callable;
+
+ /**
+ * Test for Open Container count per Datanode.
+ */
+ public class TestOpenContainerCount {
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private NodeEndpoint nodeEndpoint;
+ private ReconOMMetadataManager reconOMMetadataManager;
+ private ReconStorageContainerManagerFacade reconScm;
+ private boolean isSetupDone = false;
+ private String pipelineId;
+ private String pipelineId2;
+ private DatanodeDetails datanodeDetails;
+ private String datanodeId;
+ private ContainerReportsProto containerReportsProto;
+ private Builder builder;
+ private ExtendedDatanodeDetailsProto extendedDatanodeDetailsProto;
+ private NodeReportProto nodeReportProto;
+ private PipelineReportsProto pipelineReportsProto;
+ private Pipeline pipeline;
+ private Pipeline pipeline2;
+ private static final String HOST1 = "host1.datanode";
+ private static final String IP1 = "1.1.1.1";
+ private ReconUtils reconUtilsMock;
+ private StorageContainerServiceProvider mockScmServiceProvider;
+
+ private List<Long> containerIDs;
+
+ private List<ContainerWithPipeline> cpw;
+
+ private void initializeInjector() throws Exception {
+ reconOMMetadataManager = getTestReconOmMetadataManager(
+ initializeNewOmMetadataManager(temporaryFolder.newFolder()),
+ temporaryFolder.newFolder());
+ datanodeDetails = randomDatanodeDetails();
+ datanodeDetails.setHostName(HOST1);
+ datanodeDetails.setIpAddress(IP1);
+ pipeline = getRandomPipeline(datanodeDetails);
+ pipelineId = pipeline.getId().getId().toString();
+
+ pipeline2 = getRandomPipeline(datanodeDetails);
+ pipelineId2 = pipeline2.getId().getId().toString();
+
+ StorageContainerLocationProtocol mockScmClient = mock(
+ StorageContainerLocationProtocol.class);
+ mockScmServiceProvider = mock(
+ StorageContainerServiceProviderImpl.class);
+
+ when(mockScmServiceProvider.getPipeline(
+ pipeline.getId().getProtobuf())).thenReturn(pipeline);
+ when(mockScmServiceProvider.getPipeline(
+ pipeline2.getId().getProtobuf())).thenReturn(pipeline2);
+
+ // Open 5 containers on pipeline 1
+ containerIDs = new LinkedList<>();
+ cpw = new LinkedList<>();
+ for (long i = 1L; i <= 5L; ++i) {
+ ContainerInfo containerInfo = new ContainerInfo.Builder()
+ .setContainerID(i)
+ .setReplicationConfig(
+ new RatisReplicationConfig(ReplicationFactor.ONE))
+ .setState(LifeCycleState.OPEN)
+ .setOwner("test")
+ .setPipelineID(pipeline.getId())
+ .build();
+ ContainerWithPipeline containerWithPipeline =
+ new ContainerWithPipeline(containerInfo, pipeline);
+ when(mockScmServiceProvider.getContainerWithPipeline(i))
+ .thenReturn(containerWithPipeline);
+ containerIDs.add(i);
+ cpw.add(containerWithPipeline);
+ }
+
+ // Open 5 containers on pipeline 2
+ for (long i = 6L; i <= 10L; ++i) {
+ ContainerInfo containerInfo = new ContainerInfo.Builder()
+ .setContainerID(i)
+ .setReplicationConfig(
+ new RatisReplicationConfig(ReplicationFactor.ONE))
+ .setState(LifeCycleState.OPEN)
+ .setOwner("test")
+ .setPipelineID(pipeline2.getId())
+ .build();
+ ContainerWithPipeline containerWithPipeline =
+ new ContainerWithPipeline(containerInfo, pipeline2);
+ when(mockScmServiceProvider.getContainerWithPipeline(i))
+ .thenReturn(containerWithPipeline);
+ containerIDs.add(i);
+ cpw.add(containerWithPipeline);
+ }
+
+ when(mockScmServiceProvider
+ .getExistContainerWithPipelinesInBatch(containerIDs))
+ .thenReturn(cpw);
+
+ reconUtilsMock = mock(ReconUtils.class);
+ HttpURLConnection urlConnectionMock = mock(HttpURLConnection.class);
+ when(urlConnectionMock.getResponseCode())
+ .thenReturn(HttpServletResponse.SC_OK);
+ when(reconUtilsMock.makeHttpCall(any(URLConnectionFactory.class),
+ anyString(), anyBoolean())).thenReturn(urlConnectionMock);
+
+ ReconTestInjector reconTestInjector =
+ new ReconTestInjector.Builder(temporaryFolder)
+ .withReconSqlDb()
+ .withReconOm(reconOMMetadataManager)
+ .withOmServiceProvider(
+ mock(OzoneManagerServiceProviderImpl.class))
+ .addBinding(StorageContainerServiceProvider.class,
+ mockScmServiceProvider)
+ .addBinding(OzoneStorageContainerManager.class,
+ ReconStorageContainerManagerFacade.class)
+ .withContainerDB()
+ .addBinding(NodeEndpoint.class)
+ .addBinding(MetricsServiceProviderFactory.class)
+ .addBinding(ContainerHealthSchemaManager.class)
+ .addBinding(ReconUtils.class, reconUtilsMock)
+ .addBinding(StorageContainerLocationProtocol.class,
+ mockScmClient)
+ .build();
+
+ nodeEndpoint = reconTestInjector.getInstance(NodeEndpoint.class);
+ reconScm = (ReconStorageContainerManagerFacade)
+ reconTestInjector.getInstance(OzoneStorageContainerManager.class);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // The following setup runs only once
+ if (!isSetupDone) {
+ initializeInjector();
+ isSetupDone = true;
+ }
+ datanodeId = datanodeDetails.getUuid().toString();
+
+ // initialize container report
+ builder = ContainerReportsProto.newBuilder();
+ for (long i = 1L; i <= 10L; i++) {
+ builder.addReports(
+ ContainerReplicaProto.newBuilder()
+ .setContainerID(i)
+ .setState(ContainerReplicaProto.State.OPEN)
+ .setOriginNodeId(datanodeId)
+ .build()
+ );
+ }
+ containerReportsProto = builder.build();
+
+ UUID pipelineUuid = UUID.fromString(pipelineId);
+ HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
+ .setMostSigBits(pipelineUuid.getMostSignificantBits())
+ .setLeastSigBits(pipelineUuid.getLeastSignificantBits())
+ .build();
+
+ UUID pipelineUuid2 = UUID.fromString(pipelineId2);
+ HddsProtos.UUID uuid1282 = HddsProtos.UUID.newBuilder()
+ .setMostSigBits(pipelineUuid2.getMostSignificantBits())
+ .setLeastSigBits(pipelineUuid2.getLeastSignificantBits())
+ .build();
+
+ PipelineReport pipelineReport = PipelineReport.newBuilder()
+ .setPipelineID(
+ PipelineID.newBuilder()
+ .setId(pipelineId)
+ .setUuid128(uuid128)
+ .build())
+ .setIsLeader(true)
+ .build();
+
+ PipelineReport pipelineReport2 = PipelineReport.newBuilder()
+ .setPipelineID(
+ PipelineID
+ .newBuilder()
+ .setId(pipelineId2)
+ .setUuid128(uuid1282)
+ .build())
+ .setIsLeader(false)
+ .build();
+
+ pipelineReportsProto =
+ PipelineReportsProto.newBuilder()
+ .addPipelineReport(pipelineReport)
+ .addPipelineReport(pipelineReport2)
+ .build();
+
+ DatanodeDetailsProto datanodeDetailsProto =
+ DatanodeDetailsProto.newBuilder()
+ .setHostName(HOST1)
+ .setUuid(datanodeId)
+ .setIpAddress(IP1)
+ .build();
+
+ extendedDatanodeDetailsProto =
+ HddsProtos.ExtendedDatanodeDetailsProto.newBuilder()
+ .setDatanodeDetails(datanodeDetailsProto)
+ .setVersion("0.6.0")
+ .setSetupTime(1596347628802L)
+ .setBuildDate("2020-08-01T08:50Z")
+ .setRevision("3346f493fa1690358add7bb9f3e5b52545993f36")
+ .build();
+
+ StorageReportProto storageReportProto1 =
+ StorageReportProto.newBuilder()
+ .setStorageType(StorageTypeProto.DISK)
+ .setStorageLocation("/disk1")
+ .setScmUsed(10 * OzoneConsts.GB)
+ .setRemaining(90 * OzoneConsts.GB)
+ .setCapacity(100 * OzoneConsts.GB)
+ .setStorageUuid(UUID.randomUUID().toString())
+ .setFailed(false).build();
+
+ StorageReportProto storageReportProto2 =
+ StorageReportProto.newBuilder()
+ .setStorageType(StorageTypeProto.DISK)
+ .setStorageLocation("/disk2")
+ .setScmUsed(10 * OzoneConsts.GB)
+ .setRemaining(90 * OzoneConsts.GB)
+ .setCapacity(100 * OzoneConsts.GB)
+ .setStorageUuid(UUID.randomUUID().toString())
+ .setFailed(false).build();
+
+ nodeReportProto =
+ NodeReportProto.newBuilder()
+ .addStorageReport(storageReportProto1)
+ .addStorageReport(storageReportProto2).build();
+
+ try {
+ reconScm.getDatanodeProtocolServer()
+ .register(extendedDatanodeDetailsProto, nodeReportProto,
- containerReportsProto, pipelineReportsProto);
++ containerReportsProto, pipelineReportsProto,
++ defaultLayoutVersionProto());
+ // Process all events in the event queue
+ reconScm.getEventQueue().processAll(1000);
+ } catch (Exception ex) {
+ Assert.fail(ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testOpenContainerCount() throws Exception {
+ // In case of pipeline doesn't exist
+ waitAndCheckConditionAfterHeartbeat(() -> {
+
+ DatanodeMetadata datanodeMetadata1 = getDatanodeMetadata();
+ return datanodeMetadata1.getContainers() == 10
+ && datanodeMetadata1.getPipelines().size() == 2;
+ });
+
+ DatanodeMetadata datanodeMetadata = getDatanodeMetadata();
+
+ int expectedCnt = datanodeMetadata.getOpenContainers();
+
+ // check if open container's count decrement according
+ for (long id = 1L; id <= 10L; ++id) {
+ --expectedCnt;
+ closeContainer(id);
+ DatanodeMetadata metadata = getDatanodeMetadata();
+ Assert.assertEquals(expectedCnt, metadata.getOpenContainers());
+ }
+ }
+
+ private DatanodeMetadata getDatanodeMetadata() {
+ Response response = nodeEndpoint.getDatanodes();
+ DatanodesResponse datanodesResponse =
+ (DatanodesResponse) response.getEntity();
+
+ DatanodeMetadata datanodeMetadata =
+ datanodesResponse.getDatanodes().stream().filter(metadata ->
+ metadata.getHostname().equals("host1.datanode"))
+ .findFirst().orElse(null);
+ return datanodeMetadata;
+ }
+
+ private void closeContainer(long containerID) throws IOException {
+
+ if (containerID >= 1L && containerID <= 5L) {
+ ContainerInfo closedContainer = new ContainerInfo.Builder()
+ .setContainerID(containerID)
+ .setReplicationConfig(
+ new RatisReplicationConfig(ReplicationFactor.ONE))
+ .setState(LifeCycleState.CLOSED)
+ .setOwner("test")
+ .setPipelineID(pipeline.getId())
+ .build();
+ ContainerWithPipeline containerWithPipeline =
+ new ContainerWithPipeline(closedContainer, pipeline);
+ when(mockScmServiceProvider.getContainerWithPipeline(containerID))
+ .thenReturn(containerWithPipeline);
+ cpw.set((int) containerID - 1, containerWithPipeline);
+ } else if (containerID >= 6L && containerID <= 10L) {
+ ContainerInfo closedContainer = new ContainerInfo.Builder()
+ .setContainerID(containerID)
+ .setReplicationConfig(
+ new RatisReplicationConfig(ReplicationFactor.ONE))
+ .setState(LifeCycleState.CLOSED)
+ .setOwner("test")
+ .setPipelineID(pipeline2.getId())
+ .build();
+ ContainerWithPipeline containerWithPipeline =
+ new ContainerWithPipeline(closedContainer, pipeline2);
+ when(mockScmServiceProvider.getContainerWithPipeline(containerID))
+ .thenReturn(containerWithPipeline);
+ cpw.set((int) containerID - 1, containerWithPipeline);
+ }
+ when(mockScmServiceProvider
+ .getExistContainerWithPipelinesInBatch(containerIDs))
+ .thenReturn(cpw);
+ updateContainerReport(containerID);
+ }
+
+ private void updateContainerReport(long containerId) {
+ containerReportsProto = builder.setReports((int) containerId - 1,
+ ContainerReplicaProto.newBuilder()
+ .setContainerID(containerId)
+ .setState(ContainerReplicaProto.State.CLOSED)
+ .setOriginNodeId(datanodeId)
+ .build())
+ .build();
+ try {
+ reconScm.getDatanodeProtocolServer()
+ .register(extendedDatanodeDetailsProto, nodeReportProto,
- containerReportsProto, pipelineReportsProto);
++ containerReportsProto, pipelineReportsProto,
++ defaultLayoutVersionProto());
+ // Process all events in the event queue
+ reconScm.getEventQueue().processAll(1000);
+ } catch (Exception ex) {
+ Assert.fail(ex.getMessage());
+ }
+ }
+
+ private void waitAndCheckConditionAfterHeartbeat(Callable<Boolean> check)
+ throws Exception {
+ // if container report is processed first, and pipeline does not exist
+ // then container is not added until the next container report is processed
+ SCMHeartbeatRequestProto heartbeatRequestProto =
+ SCMHeartbeatRequestProto.newBuilder()
+ .setContainerReport(containerReportsProto)
+ .setDatanodeDetails(extendedDatanodeDetailsProto
+ .getDatanodeDetails())
+ .build();
+
+ reconScm.getDatanodeProtocolServer().sendHeartbeat(heartbeatRequestProto);
+ LambdaTestUtils.await(30000, 1000, check);
+ }
+ }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org