You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by er...@apache.org on 2022/11/17 01:18:10 UTC
[ozone] branch master updated: HDDS-7095. allow on demand scanning for containers (#3788)
This is an automated email from the ASF dual-hosted git repository.
erose pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 95568ab735 HDDS-7095. allow on demand scanning for containers (#3788)
95568ab735 is described below
commit 95568ab735af635768e758bf5f9413d554fee6a7
Author: Galsza <10...@users.noreply.github.com>
AuthorDate: Thu Nov 17 02:18:05 2022 +0100
HDDS-7095. allow on demand scanning for containers (#3788)
---
.../container/common/impl/HddsDispatcher.java | 45 ++---
.../ozoneimpl/ContainerScannerConfiguration.java | 29 +++-
.../ozoneimpl/OnDemandContainerScanner.java | 178 ++++++++++++++++++++
.../ozoneimpl/OnDemandScannerMetrics.java | 42 +++++
.../ozone/container/ozoneimpl/OzoneContainer.java | 57 ++++---
.../ozone/container/common/ContainerTestUtils.java | 39 ++++-
.../TestContainerScannerConfiguration.java | 14 +-
.../ozoneimpl/TestContainerScannerMetrics.java | 74 ++++++---
.../ozoneimpl/TestOnDemandContainerScanner.java | 157 ++++++++++++++++++
.../hadoop/ozone/dn/scanner/TestDataScanner.java | 181 +++++++++++----------
10 files changed, 662 insertions(+), 154 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 6dde0181a8..5059a64291 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -18,11 +18,9 @@
package org.apache.hadoop.ozone.container.common.impl;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -58,19 +56,21 @@ import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerScanner;
import org.apache.hadoop.security.UserGroupInformation;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
-import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
-import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
-
import org.apache.hadoop.util.Time;
import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
+import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
+
/**
* Ozone Container dispatcher takes a call from the netty server and routes it
* to the right handler function.
@@ -332,11 +332,6 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
// state here.
Result result = responseProto.getResult();
- if (cmdType == Type.CreateContainer
- && result == Result.SUCCESS && dispatcherContext != null) {
- Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
- container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
- }
if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) {
// If the container is open/closing and the container operation
// has failed, it should be first marked unhealthy and the initiate the
@@ -373,11 +368,21 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
container.getContainerData().getState() == State.UNHEALTHY);
sendCloseContainerActionIfNeeded(container);
}
-
+ if (cmdType == Type.CreateContainer
+ && result == Result.SUCCESS && dispatcherContext != null) {
+ Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
+ container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
+ }
if (result == Result.SUCCESS) {
updateBCSID(container, dispatcherContext, cmdType);
audit(action, eventType, params, AuditEventStatus.SUCCESS, null);
} else {
+ //TODO HDDS-7096:
+ // This is a too general place for on demand scanning.
+ // Create a specific exception that signals for on demand scanning
+ // and move this general scan to where it is more appropriate.
+ // Add integration tests to test the full functionality.
+ OnDemandContainerScanner.scanContainer(container);
audit(action, eventType, params, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
@@ -561,8 +566,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private boolean isContainerUnhealthy(Container container) {
return Optional.ofNullable(container).map(
- cont -> (cont.getContainerState() ==
- ContainerDataProto.State.UNHEALTHY))
+ cont -> (cont.getContainerState() ==
+ ContainerDataProto.State.UNHEALTHY))
.orElse(Boolean.FALSE);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java
index 4fa7773d1e..20c519bd6e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScannerConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.PostConstruct;
+import org.apache.hadoop.ozone.OzoneConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,12 +46,17 @@ public class ContainerScannerConfiguration {
"hdds.container.scrub.data.scan.interval";
public static final String VOLUME_BYTES_PER_SECOND_KEY =
"hdds.container.scrub.volume.bytes.per.second";
+ public static final String ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY =
+ "hdds.container.scrub.on.demand.volume.bytes.per.second";
public static final long METADATA_SCAN_INTERVAL_DEFAULT =
Duration.ofHours(3).toMillis();
public static final long DATA_SCAN_INTERVAL_DEFAULT =
Duration.ofDays(7).toMillis();
- public static final long BANDWIDTH_PER_VOLUME_DEFAULT = 1048576; // 1MB
+
+ public static final long BANDWIDTH_PER_VOLUME_DEFAULT = OzoneConsts.MB * 5L;
+ public static final long ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT =
+ OzoneConsts.MB * 5L;
@Config(key = "enabled",
type = ConfigType.BOOLEAN,
@@ -80,12 +86,21 @@ public class ContainerScannerConfiguration {
@Config(key = "volume.bytes.per.second",
type = ConfigType.LONG,
- defaultValue = "1048576",
+ defaultValue = "5242880",
tags = {ConfigTag.STORAGE},
description = "Config parameter to throttle I/O bandwidth used"
+ " by scanner per volume.")
private long bandwidthPerVolume = BANDWIDTH_PER_VOLUME_DEFAULT;
+ @Config(key = "on.demand.volume.bytes.per.second",
+ type = ConfigType.LONG,
+ defaultValue = "5242880",
+ tags = {ConfigTag.STORAGE},
+ description = "Config parameter to throttle I/O bandwidth used"
+ + " by the demand container scanner per volume.")
+ private long onDemandBandwidthPerVolume
+ = ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT;
+
@PostConstruct
public void validate() {
if (metadataScanInterval < 0) {
@@ -108,6 +123,12 @@ public class ContainerScannerConfiguration {
bandwidthPerVolume, BANDWIDTH_PER_VOLUME_DEFAULT);
bandwidthPerVolume = BANDWIDTH_PER_VOLUME_DEFAULT;
}
+ if (onDemandBandwidthPerVolume < 0) {
+ LOG.warn(ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY +
+ " must be >= 0 and was set to {}. Defaulting to {}",
+ onDemandBandwidthPerVolume, ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT);
+ onDemandBandwidthPerVolume = ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT;
+ }
}
public void setEnabled(boolean enabled) {
@@ -137,4 +158,8 @@ public class ContainerScannerConfiguration {
public long getBandwidthPerVolume() {
return bandwidthPerVolume;
}
+
+ public long getOnDemandBandwidthPerVolume() {
+ return onDemandBandwidthPerVolume;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java
new file mode 100644
index 0000000000..69c8ba3fee
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerScanner.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for performing on demand scans of containers.
+ */
+public final class OnDemandContainerScanner {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(OnDemandContainerScanner.class);
+
+ private static volatile OnDemandContainerScanner instance;
+
+ private final ExecutorService scanExecutor;
+ private final ContainerController containerController;
+ private final DataTransferThrottler throttler;
+ private final Canceler canceler;
+ private final ConcurrentHashMap
+ .KeySetView<Long, Boolean> containerRescheduleCheckSet;
+ private final OnDemandScannerMetrics metrics;
+
+ private OnDemandContainerScanner(
+ ContainerScannerConfiguration conf, ContainerController controller) {
+ containerController = controller;
+ throttler = new DataTransferThrottler(
+ conf.getOnDemandBandwidthPerVolume());
+ canceler = new Canceler();
+ metrics = OnDemandScannerMetrics.create();
+ scanExecutor = Executors.newSingleThreadExecutor();
+ containerRescheduleCheckSet = ConcurrentHashMap.newKeySet();
+ }
+
+ public static synchronized void init(
+ ContainerScannerConfiguration conf, ContainerController controller) {
+ if (instance != null) {
+ LOG.warn("Trying to initialize on demand scanner" +
+ " a second time on a datanode.");
+ return;
+ }
+ instance = new OnDemandContainerScanner(conf, controller);
+ }
+
+ public static Optional<Future<?>> scanContainer(Container<?> container) {
+ if (instance == null || !container.shouldScanData()) {
+ return Optional.empty();
+ }
+ Future<?> resultFuture = null;
+ long containerId = container.getContainerData().getContainerID();
+ if (addContainerToScheduledContainers(containerId)) {
+ resultFuture = instance.scanExecutor.submit(() -> {
+ if (container.shouldScanData()) {
+ performOnDemandScan(container);
+ }
+ removeContainerFromScheduledContainers(containerId);
+ });
+ }
+ return Optional.ofNullable(resultFuture);
+ }
+
+ private static boolean addContainerToScheduledContainers(long containerId) {
+ return instance.containerRescheduleCheckSet.add(containerId);
+ }
+
+ private static void removeContainerFromScheduledContainers(
+ long containerId) {
+ instance.containerRescheduleCheckSet.remove(containerId);
+ }
+
+ private static void performOnDemandScan(Container<?> container) {
+ long containerId = container.getContainerData().getContainerID();
+ try {
+ ContainerData containerData = container.getContainerData();
+ logScanStart(containerData);
+ if (container.scanData(instance.throttler, instance.canceler)) {
+ Instant now = Instant.now();
+ logScanCompleted(containerData, now);
+ instance.containerController.updateDataScanTimestamp(containerId, now);
+ } else {
+ instance.containerController.markContainerUnhealthy(containerId);
+ instance.metrics.incNumUnHealthyContainers();
+ }
+ instance.metrics.incNumContainersScanned();
+ } catch (IOException e) {
+ LOG.warn("Unexpected exception while scanning container "
+ + containerId, e);
+ }
+ }
+
+ private static void logScanStart(ContainerData containerData) {
+ if (LOG.isDebugEnabled()) {
+ Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
+ Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
+ LOG.debug("Scanning container {}, last scanned {}",
+ containerData.getContainerID(), lastScanTime);
+ }
+ }
+
+ private static void logScanCompleted(
+ ContainerData containerData, Instant timestamp) {
+ LOG.debug("Completed scan of container {} at {}",
+ containerData.getContainerID(), timestamp);
+ }
+
+ public static OnDemandScannerMetrics getMetrics() {
+ return instance.metrics;
+ }
+
+ @VisibleForTesting
+ public static DataTransferThrottler getThrottler() {
+ return instance.throttler;
+ }
+
+ @VisibleForTesting
+ public static Canceler getCanceler() {
+ return instance.canceler;
+ }
+
+ public static synchronized void shutdown() {
+ if (instance == null) {
+ return;
+ }
+ instance.shutdownScanner();
+ }
+
+ private synchronized void shutdownScanner() {
+ instance = null;
+ metrics.unregister();
+ this.canceler.cancel("On-demand container" +
+ " scanner is shutting down.");
+ if (!scanExecutor.isShutdown()) {
+ scanExecutor.shutdown();
+ }
+ try {
+ long timeoutSeconds = 5;
+ if (!scanExecutor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
+ LOG.warn("On demand scanner shut down forcefully after {} seconds",
+ timeoutSeconds);
+ scanExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("On demand scanner interrupted while waiting for shut down.");
+ scanExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandScannerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandScannerMetrics.java
new file mode 100644
index 0000000000..473713704c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandScannerMetrics.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
+/**
+ * This class captures the on-demand container data scanner metrics.
+ **/
+@InterfaceAudience.Private
+@Metrics(about = "On-demand container data scanner metrics", context = "dfs")
+public final class OnDemandScannerMetrics
+ extends AbstractContainerScannerMetrics {
+
+ private OnDemandScannerMetrics(String name, MetricsSystem ms) {
+ super(name, ms);
+ }
+
+ public static OnDemandScannerMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ String name = "On-demand container scanner metrics";
+ return ms.register(name, null, new OnDemandScannerMetrics(name, ms));
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 2ceb5d916a..5d2153465e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -84,6 +84,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_SCRUBBING_SERVICE_WORKERS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY;
import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.VOLUME_BYTES_PER_SECOND_KEY;
/**
@@ -307,30 +308,45 @@ public class OzoneContainer {
private void startContainerScrub() {
ContainerScannerConfiguration c = config.getObject(
ContainerScannerConfiguration.class);
- boolean enabled = c.isEnabled();
+ if (!c.isEnabled()) {
+ LOG.info("Scheduled background container scanners and " +
+ "the on-demand container scanner have been disabled.");
+ return;
+ }
+ initOnDemandContainerScanner(c);
+ initMetadataScanner(c);
+ initContainerScanner(c);
+ }
- if (!enabled) {
- LOG.info("Background container scanner has been disabled.");
- } else {
- if (this.metadataScanner == null) {
- this.metadataScanner = new ContainerMetadataScanner(c, controller);
- }
- this.metadataScanner.start();
+ private void initContainerScanner(ContainerScannerConfiguration c) {
+ if (c.getBandwidthPerVolume() == 0L) {
+ LOG.warn(VOLUME_BYTES_PER_SECOND_KEY + " is set to 0, " +
+ "so background container data scanner will not start.");
+ return;
+ }
+ dataScanners = new ArrayList<>();
+ for (StorageVolume v : volumeSet.getVolumesList()) {
+ ContainerDataScanner s = new ContainerDataScanner(c, controller,
+ (HddsVolume) v);
+ s.start();
+ dataScanners.add(s);
+ }
+ }
- if (c.getBandwidthPerVolume() == 0L) {
- LOG.warn(VOLUME_BYTES_PER_SECOND_KEY + " is set to 0, " +
- "so background container data scanner will not start.");
- return;
- }
+ private void initMetadataScanner(ContainerScannerConfiguration c) {
+ if (this.metadataScanner == null) {
+ this.metadataScanner = new ContainerMetadataScanner(c, controller);
+ }
+ this.metadataScanner.start();
+ }
- dataScanners = new ArrayList<>();
- for (StorageVolume v : volumeSet.getVolumesList()) {
- ContainerDataScanner s = new ContainerDataScanner(c, controller,
- (HddsVolume) v);
- s.start();
- dataScanners.add(s);
- }
+ private void initOnDemandContainerScanner(ContainerScannerConfiguration c) {
+ if (c.getOnDemandBandwidthPerVolume() == 0L) {
+ LOG.warn(ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY + " is set to 0, " +
+ "so the on-demand container data scanner will not start.");
+ return;
}
+ OnDemandContainerScanner.init(c, controller);
}
/**
@@ -349,6 +365,7 @@ public class OzoneContainer {
for (ContainerDataScanner s : dataScanners) {
s.shutdown();
}
+ OnDemandContainerScanner.shutdown();
}
/**
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index a60f0d52ae..a216075889 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -17,11 +17,6 @@
package org.apache.hadoop.ozone.container.common;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Random;
-import java.util.UUID;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -29,11 +24,15 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
@@ -47,9 +46,18 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.security.UserGroupInformation;
-
import org.mockito.Mockito;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Helper utility to test containers.
*/
@@ -157,4 +165,23 @@ public final class ContainerTestUtils {
null, null);
}
}
+
+ public static void setupMockContainer(
+ Container<ContainerData> c, boolean shouldScanData,
+ boolean scanMetaDataSuccess, boolean scanDataSuccess,
+ AtomicLong containerIdSeq) {
+ setupMockContainer(c, shouldScanData, scanDataSuccess, containerIdSeq);
+ when(c.scanMetaData()).thenReturn(scanMetaDataSuccess);
+ }
+
+ public static void setupMockContainer(
+ Container<ContainerData> c, boolean shouldScanData,
+ boolean scanDataSuccess, AtomicLong containerIdSeq) {
+ ContainerData data = mock(ContainerData.class);
+ when(data.getContainerID()).thenReturn(containerIdSeq.getAndIncrement());
+ when(c.getContainerData()).thenReturn(data);
+ when(c.shouldScanData()).thenReturn(shouldScanData);
+ when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
+ .thenReturn(scanDataSuccess);
+ }
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerConfiguration.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerConfiguration.java
index 6a4607c845..077acbb9d9 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerConfiguration.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerConfiguration.java
@@ -29,8 +29,11 @@ import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfig
import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.DATA_SCAN_INTERVAL_KEY;
import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.METADATA_SCAN_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.METADATA_SCAN_INTERVAL_KEY;
+import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT;
+import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY;
import static org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration.VOLUME_BYTES_PER_SECOND_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
/**
* Test for {@link ContainerScannerConfiguration}.
@@ -48,10 +51,12 @@ public class TestContainerScannerConfiguration {
public void acceptsValidValues() {
long validInterval = Duration.ofHours(1).toMillis();
long validBandwidth = (long) StorageUnit.MB.toBytes(1);
+ long validOnDemandBandwidth = (long) StorageUnit.MB.toBytes(2);
conf.setLong(METADATA_SCAN_INTERVAL_KEY, validInterval);
conf.setLong(DATA_SCAN_INTERVAL_KEY, validInterval);
conf.setLong(VOLUME_BYTES_PER_SECOND_KEY, validBandwidth);
+ conf.setLong(ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY, validOnDemandBandwidth);
ContainerScannerConfiguration csConf =
conf.getObject(ContainerScannerConfiguration.class);
@@ -59,6 +64,8 @@ public class TestContainerScannerConfiguration {
assertEquals(validInterval, csConf.getMetadataScanInterval());
assertEquals(validInterval, csConf.getDataScanInterval());
assertEquals(validBandwidth, csConf.getBandwidthPerVolume());
+ assertEquals(validOnDemandBandwidth,
+ csConf.getOnDemandBandwidthPerVolume());
}
@Test
@@ -69,6 +76,7 @@ public class TestContainerScannerConfiguration {
conf.setLong(METADATA_SCAN_INTERVAL_KEY, invalidInterval);
conf.setLong(DATA_SCAN_INTERVAL_KEY, invalidInterval);
conf.setLong(VOLUME_BYTES_PER_SECOND_KEY, invalidBandwidth);
+ conf.setLong(ON_DEMAND_VOLUME_BYTES_PER_SECOND_KEY, invalidBandwidth);
ContainerScannerConfiguration csConf =
conf.getObject(ContainerScannerConfiguration.class);
@@ -79,6 +87,8 @@ public class TestContainerScannerConfiguration {
csConf.getDataScanInterval());
assertEquals(BANDWIDTH_PER_VOLUME_DEFAULT,
csConf.getBandwidthPerVolume());
+ assertEquals(ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT,
+ csConf.getOnDemandBandwidthPerVolume());
}
@Test
@@ -86,12 +96,14 @@ public class TestContainerScannerConfiguration {
ContainerScannerConfiguration csConf =
conf.getObject(ContainerScannerConfiguration.class);
- assertEquals(false, csConf.isEnabled());
+ assertFalse(csConf.isEnabled());
assertEquals(METADATA_SCAN_INTERVAL_DEFAULT,
csConf.getMetadataScanInterval());
assertEquals(DATA_SCAN_INTERVAL_DEFAULT,
csConf.getDataScanInterval());
assertEquals(BANDWIDTH_PER_VOLUME_DEFAULT,
csConf.getBandwidthPerVolume());
+ assertEquals(ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT,
+ csConf.getOnDemandBandwidthPerVolume());
}
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerMetrics.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerMetrics.java
index cc8a60c9ca..6d9eb31db1 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerMetrics.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerScannerMetrics.java
@@ -17,12 +17,13 @@
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
-import org.apache.hadoop.hdfs.util.Canceler;
-import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.commons.compress.utils.Lists;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -32,15 +33,18 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -73,9 +77,15 @@ public class TestContainerScannerMetrics {
conf = newInstanceOf(ContainerScannerConfiguration.class);
conf.setMetadataScanInterval(0);
conf.setDataScanInterval(0);
+ conf.setEnabled(true);
controller = mockContainerController();
}
+ @AfterEach
+ public void tearDown() {
+ OnDemandContainerScanner.shutdown();
+ }
+
@Test
public void testContainerMetaDataScannerMetrics() {
ContainerMetadataScanner subject =
@@ -129,15 +139,54 @@ public class TestContainerScannerMetrics {
assertNull(DefaultMetricsSystem.instance().getSource(name));
}
+ @Test
+ public void testOnDemandScannerMetrics() throws Exception {
+ OnDemandContainerScanner.init(conf, controller);
+ ArrayList<Optional<Future<?>>> resultFutureList = Lists.newArrayList();
+ resultFutureList.add(OnDemandContainerScanner.scanContainer(corruptData));
+ resultFutureList.add(
+ OnDemandContainerScanner.scanContainer(corruptMetadata));
+ resultFutureList.add(OnDemandContainerScanner.scanContainer(healthy));
+ waitOnScannerToFinish(resultFutureList);
+ OnDemandScannerMetrics metrics = OnDemandContainerScanner.getMetrics();
+ //Containers with shouldScanData = false shouldn't increase
+ // the number of scanned containers
+ assertEquals(1, metrics.getNumUnHealthyContainers());
+ assertEquals(2, metrics.getNumContainersScanned());
+ }
+
+ private void waitOnScannerToFinish(
+ ArrayList<Optional<Future<?>>> resultFutureList)
+ throws ExecutionException, InterruptedException {
+ for (Optional<Future<?>> future : resultFutureList) {
+ if (future.isPresent()) {
+ future.get().get();
+ }
+ }
+ }
+
+ @Test
+ public void testOnDemandScannerMetricsUnregisters() {
+ OnDemandContainerScanner.init(conf, controller);
+ String metricsName = OnDemandContainerScanner.getMetrics().getName();
+ assertNotNull(DefaultMetricsSystem.instance().getSource(metricsName));
+ OnDemandContainerScanner.shutdown();
+ OnDemandContainerScanner.scanContainer(healthy);
+ assertNull(DefaultMetricsSystem.instance().getSource(metricsName));
+ }
+
private ContainerController mockContainerController() {
// healthy container
- setupMockContainer(healthy, true, true, true);
+ ContainerTestUtils.setupMockContainer(healthy,
+ true, true, true, containerIdSeq);
// unhealthy container (corrupt data)
- setupMockContainer(corruptData, true, true, false);
+ ContainerTestUtils.setupMockContainer(corruptData,
+ true, true, false, containerIdSeq);
// unhealthy container (corrupt metadata)
- setupMockContainer(corruptMetadata, false, false, false);
+ ContainerTestUtils.setupMockContainer(corruptMetadata,
+ false, false, false, containerIdSeq);
Collection<Container<?>> containers = Arrays.asList(
healthy, corruptData, corruptMetadata);
@@ -147,17 +196,4 @@ public class TestContainerScannerMetrics {
return mock;
}
-
- private void setupMockContainer(
- Container<ContainerData> c, boolean shouldScanData,
- boolean scanMetaDataSuccess, boolean scanDataSuccess) {
- ContainerData data = mock(ContainerData.class);
- when(data.getContainerID()).thenReturn(containerIdSeq.getAndIncrement());
- when(c.getContainerData()).thenReturn(data);
- when(c.shouldScanData()).thenReturn(shouldScanData);
- when(c.scanMetaData()).thenReturn(scanMetaDataSuccess);
- when(c.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
- .thenReturn(scanDataSuccess);
- }
-
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java
new file mode 100644
index 0000000000..c2686b559c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerScanner.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.hdds.conf.OzoneConfiguration.newInstanceOf;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+
+/**
+ * Unit tests for the on-demand container scanner.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestOnDemandContainerScanner {
+
+ private final AtomicLong containerIdSeq = new AtomicLong(100);
+
+ @Mock
+ private Container<ContainerData> healthy;
+
+ @Mock
+ private Container<ContainerData> openContainer;
+
+ @Mock
+ private Container<ContainerData> corruptData;
+
+ private ContainerScannerConfiguration conf;
+ private ContainerController controller;
+
+ @Before
+ public void setup() {
+ conf = newInstanceOf(ContainerScannerConfiguration.class);
+ conf.setMetadataScanInterval(0);
+ conf.setDataScanInterval(0);
+ controller = mockContainerController();
+ }
+
+ @After
+ public void tearDown() {
+ OnDemandContainerScanner.shutdown();
+ }
+
+ @Test
+ public void testOnDemandContainerScanner() throws Exception {
+ //Without initialization,
+ // there shouldn't be interaction with containerController
+ OnDemandContainerScanner.scanContainer(corruptData);
+ Mockito.verifyZeroInteractions(controller);
+ OnDemandContainerScanner.init(conf, controller);
+ testContainerMarkedUnhealthy(healthy, never());
+ testContainerMarkedUnhealthy(corruptData, atLeastOnce());
+ testContainerMarkedUnhealthy(openContainer, never());
+ }
+
+ @Test
+ public void testContainerScannerMultipleInitsAndShutdowns() throws Exception {
+ OnDemandContainerScanner.init(conf, controller);
+ OnDemandContainerScanner.init(conf, controller);
+ OnDemandContainerScanner.shutdown();
+ OnDemandContainerScanner.shutdown();
+ //There shouldn't be an interaction after shutdown:
+ testContainerMarkedUnhealthy(corruptData, never());
+ }
+
+ @Test
+ public void testSameContainerQueuedMultipleTimes() throws Exception {
+ OnDemandContainerScanner.init(conf, controller);
+ //Given a container that has not finished scanning
+ CountDownLatch latch = new CountDownLatch(1);
+ Mockito.lenient().when(corruptData.scanData(
+ OnDemandContainerScanner.getThrottler(),
+ OnDemandContainerScanner.getCanceler()))
+ .thenAnswer((Answer<Boolean>) invocation -> {
+ latch.await();
+ return false;
+ });
+ Optional<Future<?>> onGoingScan = OnDemandContainerScanner
+ .scanContainer(corruptData);
+ Assert.assertTrue(onGoingScan.isPresent());
+ Assert.assertFalse(onGoingScan.get().isDone());
+ //When scheduling the same container again
+ Optional<Future<?>> secondScan = OnDemandContainerScanner
+ .scanContainer(corruptData);
+ //Then the second scan is not scheduled and the first scan can still finish
+ Assert.assertFalse(secondScan.isPresent());
+ latch.countDown();
+ onGoingScan.get().get();
+ Mockito.verify(controller, atLeastOnce()).
+ markContainerUnhealthy(corruptData.getContainerData().getContainerID());
+ }
+
+ private void testContainerMarkedUnhealthy(
+ Container<?> container, VerificationMode invocationTimes)
+ throws InterruptedException, ExecutionException, IOException {
+ Optional<Future<?>> result =
+ OnDemandContainerScanner.scanContainer(container);
+ if (result.isPresent()) {
+ result.get().get();
+ }
+ Mockito.verify(controller, invocationTimes).markContainerUnhealthy(
+ container.getContainerData().getContainerID());
+ }
+
+ private ContainerController mockContainerController() {
+ // healthy container
+ ContainerTestUtils.setupMockContainer(healthy,
+ true, true, containerIdSeq);
+
+ // unhealthy container (corrupt data)
+ ContainerTestUtils.setupMockContainer(corruptData,
+ true, false, containerIdSeq);
+
+ // unhealthy container (corrupt metadata)
+ ContainerTestUtils.setupMockContainer(openContainer,
+ false, false, containerIdSeq);
+
+ return mock(ContainerController.class);
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java
index 0075982eb9..f0ad740c45 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scanner/TestDataScanner.java
@@ -20,39 +20,35 @@
package org.apache.hadoop.ozone.dn.scanner;
import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationFactor;
-import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerMetadataScanner;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -62,14 +58,15 @@ import org.junit.rules.Timeout;
import java.io.File;
import java.io.IOException;
-import java.time.Instant;
-import java.util.HashMap;
import java.util.Set;
import java.util.UUID;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.OPEN;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
/**
* This class tests the data scanner functionality.
@@ -85,7 +82,6 @@ public class TestDataScanner {
private static OzoneConfiguration ozoneConfig;
private static OzoneClient ozClient = null;
private static ObjectStore store = null;
- private static OzoneManager ozoneManager;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@@ -93,6 +89,8 @@ public class TestDataScanner {
public static void init() throws Exception {
ozoneConfig = new OzoneConfiguration();
ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s");
+ ozoneConfig.set(ContainerScannerConfiguration.HDDS_CONTAINER_SCRUB_ENABLED,
+ String.valueOf(true));
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, PlacementPolicy.class);
ozoneConfig.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
@@ -103,7 +101,6 @@ public class TestDataScanner {
cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.ONE, 30000);
ozClient = OzoneClientFactory.getRpcClient(ozoneConfig);
store = ozClient.getObjectStore();
- ozoneManager = cluster.getOzoneManager();
storageContainerLocationClient =
cluster.getStorageContainerLocationClient();
}
@@ -121,111 +118,123 @@ public class TestDataScanner {
}
}
+ //This test performs 2 separate tests because creating
+ // and running a cluster is expensive.
@Test
- public void testOpenContainerIntegrity() throws Exception {
+ public void testScannersMarkContainerUnhealthy() throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
- Instant testStartTime = Instant.now();
-
- String value = "sample value";
+ String value = "sample key value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
- for (int i = 0; i < 10; i++) {
- String keyName = UUID.randomUUID().toString();
-
- OzoneOutputStream out = bucket.createKey(keyName,
- value.getBytes(UTF_8).length, RATIS,
- ONE, new HashMap<>());
- out.write(value.getBytes(UTF_8));
- out.close();
- OzoneKey key = bucket.getKey(keyName);
- Assert.assertEquals(keyName, key.getName());
- OzoneInputStream is = bucket.readKey(keyName);
- byte[] fileContent = new byte[value.getBytes(UTF_8).length];
- is.read(fileContent);
- Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
- keyName, RATIS,
- ONE));
- Assert.assertEquals(value, new String(fileContent, UTF_8));
- Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
- Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
- }
-
+ String keyNameInClosedContainer = "keyNameInClosedContainer";
+ OzoneOutputStream key = createKey(volumeName, bucketName,
+ keyNameInClosedContainer);
+ // write data more than 1 chunk
+ int sizeLargerThanOneChunk = (int) (OzoneConsts.MB + OzoneConsts.MB / 2);
+ byte[] data = ContainerTestHelper
+ .getFixedLengthString(value, sizeLargerThanOneChunk)
+ .getBytes(UTF_8);
+ key.write(data);
+
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ key.flush();
+ TestHelper.waitForContainerClose(key, cluster);
+ key.close();
+ String keyNameInOpenContainer = "keyNameInOpenContainer";
+ OzoneOutputStream key2 = createKey(volumeName, bucketName,
+ keyNameInOpenContainer);
+ key2.write(data);
+ key2.close();
// wait for the container report to propagate to SCM
Thread.sleep(5000);
-
Assert.assertEquals(1, cluster.getHddsDatanodes().size());
HddsDatanodeService dn = cluster.getHddsDatanodes().get(0);
OzoneContainer oc = dn.getDatanodeStateMachine().getContainer();
- ContainerSet cs = oc.getContainerSet();
- Container c = cs.getContainerIterator().next();
+ ContainerSet containerSet = oc.getContainerSet();
+ //Given an open and a closed container
+ Assert.assertTrue(containerSet.containerCount() > 1);
+ Container<?> openContainer = getContainerInState(containerSet, OPEN);
+ Container<?> closedContainer = getContainerInState(containerSet, CLOSED);
- Assert.assertTrue(cs.containerCount() > 0);
-
- // delete the chunks directory.
- File chunksDir = new File(c.getContainerData().getContainerPath(),
- "chunks");
- deleteDirectory(chunksDir);
- Assert.assertFalse(chunksDir.exists());
+ //When deleting their metadata to make them unhealthy and scanning them
+ deleteChunksDirForContainer(openContainer);
+ deleteChunksDirForContainer(closedContainer);
ContainerScannerConfiguration conf = ozoneConfig.getObject(
ContainerScannerConfiguration.class);
ContainerMetadataScanner sb = new ContainerMetadataScanner(conf,
oc.getController());
- sb.scanContainer(c);
-
+ //Scan the open container and trigger on-demand scan for the closed one
+ sb.scanContainer(openContainer);
+ tryReadKeyWithMissingChunksDir(bucket, keyNameInClosedContainer);
// wait for the incremental container report to propagate to SCM
Thread.sleep(5000);
ContainerManager cm = cluster.getStorageContainerManager()
.getContainerManager();
- Set<ContainerReplica> replicas = cm.getContainerReplicas(
- ContainerID.valueOf(c.getContainerData().getContainerID()));
- Assert.assertEquals(1, replicas.size());
- ContainerReplica r = replicas.iterator().next();
- Assert.assertEquals(StorageContainerDatanodeProtocolProtos.
- ContainerReplicaProto.State.UNHEALTHY, r.getState());
+ ContainerReplica openContainerReplica = getContainerReplica(
+ cm, openContainer.getContainerData().getContainerID());
+ ContainerReplica closedContainerReplica = getContainerReplica(
+ cm, closedContainer.getContainerData().getContainerID());
+ //Then both containers are marked unhealthy
+ Assert.assertEquals(State.UNHEALTHY, openContainerReplica.getState());
+ Assert.assertEquals(State.UNHEALTHY, closedContainerReplica.getState());
+ }
+
+ private ContainerReplica getContainerReplica(
+ ContainerManager cm, long containerId) throws ContainerNotFoundException {
+ Set<ContainerReplica> containerReplicas = cm.getContainerReplicas(
+ ContainerID.valueOf(
+ containerId));
+ Assert.assertEquals(1, containerReplicas.size());
+ return containerReplicas.iterator().next();
+ }
+
+ //ignore the result of the key read because it is expected to fail
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ private void tryReadKeyWithMissingChunksDir(
+ OzoneBucket bucket, String keyNameInClosedContainer) throws IOException {
+ try (OzoneInputStream key = bucket.readKey(keyNameInClosedContainer)) {
+ Assert.assertThrows(IOException.class, key::read);
+ }
}
- boolean deleteDirectory(File directoryToBeDeleted) {
+ private void deleteChunksDirForContainer(Container<?> container) {
+ File chunksDir = new File(container.getContainerData().getContainerPath(),
+ "chunks");
+ deleteDirectory(chunksDir);
+ Assert.assertFalse(chunksDir.exists());
+ }
+
+ private Container<?> getContainerInState(
+ ContainerSet cs, ContainerProtos.ContainerDataProto.State state) {
+ return cs.getContainerMap().values().stream()
+ .filter(c -> state ==
+ c.getContainerState())
+ .findAny()
+ .orElseThrow(() ->
+ new RuntimeException("No Open container found for testing"));
+ }
+
+ private OzoneOutputStream createKey(String volumeName, String bucketName,
+ String keyName) throws Exception {
+ return TestHelper.createKey(
+ keyName, RATIS, ONE, 0, store, volumeName, bucketName);
+ }
+
+ void deleteDirectory(File directoryToBeDeleted) {
File[] allContents = directoryToBeDeleted.listFiles();
if (allContents != null) {
for (File file : allContents) {
deleteDirectory(file);
}
}
- return directoryToBeDeleted.delete();
- }
-
- private boolean verifyRatisReplication(String volumeName, String bucketName,
- String keyName, ReplicationType type,
- ReplicationFactor factor)
- throws IOException {
- OmKeyArgs keyArgs = new OmKeyArgs.Builder()
- .setVolumeName(volumeName)
- .setBucketName(bucketName)
- .setKeyName(keyName)
- .build();
- HddsProtos.ReplicationType replicationType =
- HddsProtos.ReplicationType.valueOf(type.toString());
- HddsProtos.ReplicationFactor replicationFactor =
- HddsProtos.ReplicationFactor.valueOf(factor.getValue());
- OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
- for (OmKeyLocationInfo info :
- keyInfo.getLatestVersionLocations().getLocationList()) {
- ContainerInfo container =
- storageContainerLocationClient.getContainer(info.getContainerID());
- if (!ReplicationConfig.getLegacyFactor(container.getReplicationConfig())
- .equals(replicationFactor) || (
- container.getReplicationType() != replicationType)) {
- return false;
- }
- }
- return true;
+ directoryToBeDeleted.delete();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org