You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2022/05/17 04:42:43 UTC
[ozone] branch HDDS-3630 updated: HDDS-6597. [Merge rocksdb in datanode] Non-rolling upgrade supports container Schema V3. (#3392)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-3630
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3630 by this push:
new 3178ba5a94 HDDS-6597. [Merge rocksdb in datanode] Non-rolling upgrade supports container Schema V3. (#3392)
3178ba5a94 is described below
commit 3178ba5a94564799934d971ad66413a1d22b41f3
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Tue May 17 12:42:39 2022 +0800
HDDS-6597. [Merge rocksdb in datanode] Non-rolling upgrade supports container Schema V3. (#3392)
---
.../upgrade/AbstractLayoutVersionManager.java | 12 +-
.../common/statemachine/DatanodeStateMachine.java | 7 +
.../states/endpoint/VersionEndpointTask.java | 8 +-
.../ozone/container/common/volume/HddsVolume.java | 63 +-
.../container/common/volume/MutableVolumeSet.java | 5 +
.../container/common/volume/StorageVolume.java | 7 +
.../ozone/container/ozoneimpl/OzoneContainer.java | 17 +-
.../upgrade/DatanodeSchemaV3FinalizeAction.java | 82 +++
.../upgrade/TestDatanodeUpgradeToSchemaV3.java | 759 +++++++++++++++++++++
9 files changed, 928 insertions(+), 32 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
index 2ce6e02409..ccf7c1fba8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.ObjectName;
+
/**
* Layout Version Manager containing generic method implementations.
*/
@@ -58,6 +60,7 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
// Note that MLV may have been incremented during the upgrade
// by the time the value is read/used.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private ObjectName mBean;
protected void init(int version, T[] lfs) throws IOException {
lock.writeLock().lock();
@@ -81,7 +84,7 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
mlvFeature, mlvFeature.layoutVersion(),
slvFeature, slvFeature.layoutVersion());
- MBeans.register("LayoutVersionManager",
+ mBean = MBeans.register("LayoutVersionManager",
getClass().getSimpleName(), this);
} finally {
lock.writeLock().unlock();
@@ -215,4 +218,11 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
lock.readLock().unlock();
}
}
+
+ public void close() {
+ if (mBean != null) {
+ MBeans.unregister(mBean);
+ mBean = null;
+ }
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index e58418fc76..63b3055364 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -360,6 +360,9 @@ public class DatanodeStateMachine implements Closeable {
if (cmdProcessThread != null) {
cmdProcessThread.interrupt();
}
+ if (layoutVersionManager != null) {
+ layoutVersionManager.close();
+ }
context.setState(DatanodeStates.getLastState());
replicationSupervisorMetrics.unRegister();
executorService.shutdown();
@@ -680,4 +683,8 @@ public class DatanodeStateMachine implements Closeable {
public UpgradeFinalizer<DatanodeStateMachine> getUpgradeFinalizer() {
return upgradeFinalizer;
}
+
+ public ConfigurationSource getConf() {
+ return conf;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
index 9e0669c8e2..f9b0d882f0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -82,10 +81,9 @@ public class VersionEndpointTask implements
Preconditions.checkNotNull(clusterId,
"Reply from SCM: clusterId cannot be null");
- // Check DbVolumes
- if (SchemaV3.isFinalizedAndEnabled(configuration)) {
- checkVolumeSet(ozoneContainer.getDbVolumeSet(), scmId, clusterId);
- }
+ // Check DbVolumes, format DbVolume at first register time.
+ checkVolumeSet(ozoneContainer.getDbVolumeSet(), scmId, clusterId);
+
// Check HddsVolumes
checkVolumeSet(ozoneContainer.getVolumeSet(), scmId, clusterId);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index 513882eb9e..9fc07840da 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -22,13 +22,17 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +73,7 @@ public class HddsVolume extends StorageVolume {
// container db path. This is initialized only once together with dbVolume,
// and stored as a member to prevent spawning lots of File objects.
private File dbParentDir;
+ private AtomicBoolean dbLoaded = new AtomicBoolean(false);
/**
* Builder for HddsVolume.
@@ -114,7 +119,9 @@ public class HddsVolume extends StorageVolume {
MutableVolumeSet dbVolumeSet) throws IOException {
super.createWorkingDir(workingDirName, dbVolumeSet);
- if (SchemaV3.isFinalizedAndEnabled(getConf())) {
+ // Create DB store for a newly formatted volume
+ if (VersionedDatanodeFeatures.isFinalized(
+ HDDSLayoutFeature.DATANODE_SCHEMA_V3)) {
createDbStore(dbVolumeSet);
}
}
@@ -133,9 +140,7 @@ public class HddsVolume extends StorageVolume {
if (volumeIOStats != null) {
volumeIOStats.unregister();
}
- if (SchemaV3.isFinalizedAndEnabled(getConf())) {
- closeDbStore();
- }
+ closeDbStore();
}
@Override
@@ -144,9 +149,7 @@ public class HddsVolume extends StorageVolume {
if (volumeIOStats != null) {
volumeIOStats.unregister();
}
- if (SchemaV3.isFinalizedAndEnabled(getConf())) {
- closeDbStore();
- }
+ closeDbStore();
}
/**
@@ -178,6 +181,10 @@ public class HddsVolume extends StorageVolume {
return this.dbParentDir;
}
+ public boolean isDbLoaded() {
+ return dbLoaded.get();
+ }
+
public void loadDbStore() throws IOException {
// DN startup for the first time, not registered yet,
// so the DbVolume is not formatted.
@@ -185,6 +192,13 @@ public class HddsVolume extends StorageVolume {
return;
}
+ // DB is already loaded
+ if (dbLoaded.get()) {
+ LOG.warn("Schema V3 db is already loaded from {} for volume {}",
+ getDbParentDir(), getStorageID());
+ return;
+ }
+
File clusterIdDir = new File(dbVolume == null ?
getStorageDir() : dbVolume.getStorageDir(),
getClusterID());
@@ -214,6 +228,9 @@ public class HddsVolume extends StorageVolume {
}
dbParentDir = storageIdDir;
+ dbLoaded.set(true);
+ LOG.info("SchemaV3 db is loaded at {} for volume {}", containerDBPath,
+ getStorageID());
}
/**
@@ -221,21 +238,22 @@ public class HddsVolume extends StorageVolume {
* Use the HddsVolume directly if no DbVolume found.
* @param dbVolumeSet
*/
- public void createDbStore(MutableVolumeSet dbVolumeSet)
- throws IOException {
+ public void createDbStore(MutableVolumeSet dbVolumeSet) throws IOException {
DbVolume chosenDbVolume = null;
File clusterIdDir;
+ String workingDir = getWorkingDir() == null ? getClusterID() :
+ getWorkingDir();
if (dbVolumeSet == null || dbVolumeSet.getVolumesList().isEmpty()) {
// No extra db volumes specified, just create db under the HddsVolume.
- clusterIdDir = new File(getStorageDir(), getClusterID());
+ clusterIdDir = new File(getStorageDir(), workingDir);
} else {
// Randomly choose a DbVolume for simplicity.
List<DbVolume> dbVolumeList = StorageVolumeUtil.getDbVolumesList(
dbVolumeSet.getVolumesList());
chosenDbVolume = dbVolumeList.get(
ThreadLocalRandom.current().nextInt(dbVolumeList.size()));
- clusterIdDir = new File(chosenDbVolume.getStorageDir(), getClusterID());
+ clusterIdDir = new File(chosenDbVolume.getStorageDir(), workingDir);
}
if (!clusterIdDir.exists()) {
@@ -252,14 +270,19 @@ public class HddsVolume extends StorageVolume {
+ getStorageID());
}
- // Init the db instance for HddsVolume under the subdir above.
+ // Create the db instance for HddsVolume under the subdir above.
String containerDBPath = new File(storageIdDir, CONTAINER_DB_NAME)
.getAbsolutePath();
try {
- initPerDiskDBStore(containerDBPath, getConf());
+ HddsVolumeUtil.initPerDiskDBStore(containerDBPath, getConf());
+ dbLoaded.set(true);
+ LOG.info("SchemaV3 db is created and loaded at {} for volume {}",
+ containerDBPath, getStorageID());
} catch (IOException e) {
- throw new IOException("Can't init db instance under path "
- + containerDBPath + " for volume " + getStorageID());
+ String errMsg = "Can't create db instance under path "
+ + containerDBPath + " for volume " + getStorageID();
+ LOG.error(errMsg, e);
+ throw new IOException(errMsg);
}
// Set the dbVolume and dbParentDir of the HddsVolume for db path lookup.
@@ -268,15 +291,23 @@ public class HddsVolume extends StorageVolume {
if (chosenDbVolume != null) {
chosenDbVolume.addHddsDbStorePath(getStorageID(), containerDBPath);
}
+
+ // If SchemaV3 is disabled, close the DB instance
+ if (!SchemaV3.isFinalizedAndEnabled(getConf())) {
+ closeDbStore();
+ }
}
private void closeDbStore() {
- if (dbParentDir == null) {
+ if (!dbLoaded.get()) {
return;
}
String containerDBPath = new File(dbParentDir, CONTAINER_DB_NAME)
.getAbsolutePath();
DatanodeStoreCache.getInstance().removeDB(containerDBPath);
+ dbLoaded.set(false);
+ LOG.info("SchemaV3 db is stopped at {} for volume {}", containerDBPath,
+ getStorageID());
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
index 78f0b9c4b7..22571685df 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
@@ -454,6 +454,11 @@ public class MutableVolumeSet implements VolumeSet {
return ImmutableMap.copyOf(volumeMap);
}
+ @VisibleForTesting
+ public void setVolumeMap(Map<String, StorageVolume> map) {
+ this.volumeMap = map;
+ }
+
@VisibleForTesting
public Map<StorageType, List<StorageVolume>> getVolumeStateMap() {
return ImmutableMap.copyOf(volumeStateMap);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
index 18468f5a9d..18892b68a2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
@@ -109,6 +109,8 @@ public abstract class StorageVolume
private final VolumeSet volumeSet;
+ private String workingDir;
+
protected StorageVolume(Builder<?> b) throws IOException {
if (!b.failedVolume) {
StorageLocation location = StorageLocation.parse(b.volumeRootStr);
@@ -188,6 +190,7 @@ public abstract class StorageVolume
throw new IOException("Unable to create ID directory " + idDir +
" for datanode.");
}
+ this.workingDir = workingDirName;
}
private VolumeState analyzeVolumeState() {
@@ -381,6 +384,10 @@ public abstract class StorageVolume
return this.storageDir;
}
+ public String getWorkingDir() {
+ return this.workingDir;
+ }
+
public void refreshVolumeInfo() {
volumeInfo.refreshNow();
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 2709ea5e3e..df61cac960 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -123,10 +123,8 @@ public class OzoneContainer {
* @throws IOException
*/
public OzoneContainer(
- DatanodeDetails datanodeDetails, ConfigurationSource
- conf, StateContext context, CertificateClient certClient
- )
- throws IOException {
+ DatanodeDetails datanodeDetails, ConfigurationSource conf,
+ StateContext context, CertificateClient certClient) throws IOException {
config = conf;
this.datanodeDetails = datanodeDetails;
this.context = context;
@@ -137,13 +135,12 @@ public class OzoneContainer {
volumeSet.setFailedVolumeListener(this::handleVolumeFailures);
metaVolumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
context, VolumeType.META_VOLUME, volumeChecker);
- if (SchemaV3.isFinalizedAndEnabled(conf)) {
- dbVolumeSet = HddsServerUtil.getDatanodeDbDirs(conf).isEmpty() ? null :
- new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
- context, VolumeType.DB_VOLUME, volumeChecker);
+
+ dbVolumeSet = HddsServerUtil.getDatanodeDbDirs(conf).isEmpty() ? null :
+ new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
+ context, VolumeType.DB_VOLUME, volumeChecker);
+ if (SchemaV3.isFinalizedAndEnabled(config)) {
HddsVolumeUtil.loadAllHddsVolumeDbStore(volumeSet, dbVolumeSet, LOG);
- } else {
- dbVolumeSet = null;
}
containerSet = new ContainerSet();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeSchemaV3FinalizeAction.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeSchemaV3FinalizeAction.java
new file mode 100644
index 0000000000..7436284c72
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeSchemaV3FinalizeAction.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.upgrade;
+
+import org.apache.hadoop.hdds.upgrade.HDDSUpgradeAction;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.upgrade.UpgradeActionHdds;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.DATANODE_SCHEMA_V3;
+import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
+import static org.apache.hadoop.ozone.upgrade.UpgradeActionHdds.Component.DATANODE;
+
+/**
+ * Upgrade Action for DataNode for SCHEMA V3.
+ */
+@UpgradeActionHdds(feature = DATANODE_SCHEMA_V3, component = DATANODE,
+ type = ON_FINALIZE)
+public class DatanodeSchemaV3FinalizeAction
+ implements HDDSUpgradeAction<DatanodeStateMachine> {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(DatanodeSchemaV3FinalizeAction.class);
+
+ @Override
+ public void execute(DatanodeStateMachine dsm) throws Exception {
+ LOG.info("Upgrading Datanode volume layout for Schema V3 support.");
+
+ // Load RocksDB for each HddsVolume, build the relationship between
+ // HddsVolume and DbVolume if DbVolume is configured.
+ MutableVolumeSet dataVolumeSet = dsm.getContainer().getVolumeSet();
+ MutableVolumeSet dbVolumeSet = dsm.getContainer().getDbVolumeSet();
+ Preconditions.assertNotNull(dataVolumeSet,
+ "Data Volume should not be null");
+
+ dataVolumeSet.writeLock();
+ try {
+ for (StorageVolume hddsVolume : dataVolumeSet.getVolumesList()) {
+ HddsVolume dataVolume = (HddsVolume) hddsVolume;
+ if (dataVolume.getDbParentDir() != null) {
+ // The RocksDB for this hddsVolume is already created(newly added
+ // volume case).
+ continue;
+ }
+ dataVolume.createDbStore(dbVolumeSet);
+ }
+ } finally {
+ dataVolumeSet.writeUnlock();
+ }
+ DatanodeConfiguration dcf =
+ dsm.getConf().getObject(DatanodeConfiguration.class);
+ if (!dcf.getContainerSchemaV3Enabled()) {
+ LOG.info("Schema V3 is disabled. Won't load RocksDB in upgrade.");
+ return;
+ }
+ HddsVolumeUtil.loadAllHddsVolumeDbStore(dataVolumeSet, dbVolumeSet, LOG);
+ }
+}
+
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
new file mode 100644
index 0000000000..9082166b6f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
@@ -0,0 +1,759 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.upgrade;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.container.common.ScmTestMock;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.DbVolume;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.mockito.ArgumentMatchers.anyObject;
+
+/**
+ * Tests upgrading a single datanode from container Schema V2 to Schema V3.
+ */
+@RunWith(Parameterized.class)
+public class TestDatanodeUpgradeToSchemaV3 {
+ @Rule
+ public TemporaryFolder tempFolder;
+
+ private DatanodeStateMachine dsm;
+ private final OzoneConfiguration conf;
+ private static final String CLUSTER_ID = "clusterID";
+ private final boolean schemaV3Enabled;
+
+ private RPC.Server scmRpcServer;
+ private InetSocketAddress address;
+ private ScmTestMock scmServerImpl;
+
+ private Random random;
+
+ // hdds.datanode.container.schema.v3.enabled
+ @Parameterized.Parameters
+ public static Collection<Object[]> getSchemaFiles() {
+ Collection<Object[]> parameters = new ArrayList<>();
+ parameters.add(new Boolean[]{false});
+ parameters.add(new Boolean[]{true});
+ return parameters;
+ }
+
+ public TestDatanodeUpgradeToSchemaV3(Boolean enable) {
+ this.schemaV3Enabled = enable;
+ conf = new OzoneConfiguration();
+ conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED,
+ this.schemaV3Enabled);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ tempFolder = new TemporaryFolder();
+ tempFolder.create();
+ random = new Random();
+
+ address = SCMTestUtils.getReuseableAddress();
+ conf.setSocketAddr(ScmConfigKeys.OZONE_SCM_NAMES, address);
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+ tempFolder.getRoot().getAbsolutePath());
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (scmRpcServer != null) {
+ scmRpcServer.stop();
+ }
+
+ if (dsm != null) {
+ dsm.close();
+ }
+ }
+
+ /**
+ * Test RocksDB is created on data volume, not matter Schema V3 is
+ * enabled or not.
+ * If Schema V3 is enabled, RocksDB will be loaded.
+ */
+ @Test
+ public void testDBOnHddsVolume() throws Exception {
+ // start DN and SCM
+ startScmServer();
+ addHddsVolume();
+
+ startPreFinalizedDatanode();
+ HddsVolume dataVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
+ .getVolumesList().get(0);
+ Assert.assertNull(dataVolume.getDbVolume());
+ Assert.assertFalse(dataVolume.isDbLoaded());
+
+ dsm.finalizeUpgrade();
+ // RocksDB is created during upgrade
+ File dbFile = new File(dataVolume.getStorageDir().getAbsolutePath() + "/" +
+ dataVolume.getClusterID() + "/" + dataVolume.getStorageID());
+ Assert.assertTrue(dbFile.exists());
+
+ // RocksDB loaded when SchemaV3 is enabled
+ if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+ Assert.assertNotNull(dataVolume.getDbParentDir().getAbsolutePath()
+ .startsWith(dataVolume.getStorageDir().toString()));
+ } else {
+ // RocksDB is not loaded when SchemaV3 is disabled.
+ Assert.assertFalse(dataVolume.isDbLoaded());
+ }
+ }
+
+ /**
+ * Test RocksDB is created on DB volume when configured, not matter
+ * Schema V3 is enabled or not.
+ * If Schema V3 is enabled, RocksDB will be loaded.
+ */
+ @Test
+ public void testDBOnDbVolume() throws Exception {
+ // start DN and SCM
+ startScmServer();
+ addHddsVolume();
+ addDbVolume();
+
+ startPreFinalizedDatanode();
+ HddsVolume dataVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
+ .getVolumesList().get(0);
+ Assert.assertNull(dataVolume.getDbParentDir());
+
+ dsm.finalizeUpgrade();
+ // RocksDB is created during upgrade
+ DbVolume dbVolume = (DbVolume) dsm.getContainer().getDbVolumeSet()
+ .getVolumesList().get(0);
+ Assert.assertEquals(dbVolume, dataVolume.getDbVolume());
+ Assert.assertTrue(
+ dbVolume.getHddsVolumeIDs().contains(dataVolume.getStorageID()));
+ File dbFile = new File(dbVolume.getStorageDir().getAbsolutePath() + "/" +
+ dbVolume.getClusterID() + "/" + dataVolume.getStorageID());
+ Assert.assertTrue(dbFile.exists());
+
+ // RocksDB loaded when SchemaV3 is enabled
+ if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+ Assert.assertTrue(dataVolume.getDbParentDir().getAbsolutePath()
+ .startsWith(dbVolume.getStorageDir().toString()));
+ } else {
+ // RocksDB is not loaded when SchemaV3 is disabled.
+ Assert.assertFalse(dataVolume.isDbLoaded());
+ }
+ }
+
+ /**
+ * Test RocksDB in created in Finalize action for an existing hddsVolume.
+ * This mimics the real cluster upgrade situation.
+ */
+ @Test
+ public void testDBCreatedInFinalize() throws Exception {
+ // start DN and SCM
+ startScmServer();
+ // add one HddsVolume
+ addHddsVolume();
+
+ // Set layout version.
+ DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+ UUID.randomUUID().toString(),
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+ layoutStorage.initialize();
+ dsm = new DatanodeStateMachine(
+ ContainerTestUtils.createDatanodeDetails(), conf, null, null, null);
+ HddsVolume dataVolume = (
+ HddsVolume) dsm.getContainer().getVolumeSet().getVolumesList().get(0);
+ // Format HddsVolume to mimic the real cluster upgrade situation
+ dataVolume.format(CLUSTER_ID);
+ File idDir = new File(dataVolume.getStorageDir(), CLUSTER_ID);
+ if (!idDir.mkdir()) {
+ Assert.fail("Failed to create id directory");
+ }
+
+ Assert.assertNull(dataVolume.getDbParentDir());
+
+ // Restart DN and finalize upgrade
+ restartDatanode(
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+ dsm.finalizeUpgrade();
+
+ // RocksDB is created by upgrade action
+ dataVolume = ((HddsVolume) dsm.getContainer().getVolumeSet()
+ .getVolumesList().get(0));
+ Assert.assertNotNull(dataVolume.getDbParentDir());
+ if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+ Assert.assertTrue(dataVolume.isDbLoaded());
+ } else {
+ Assert.assertFalse(dataVolume.isDbLoaded());
+ }
+ }
+
+ /**
+ * Test finalize twice won't recreate any RocksDB for HddsVolume.
+ */
+ @Test
+ public void testFinalizeTwice() throws Exception {
+ // start DN and SCM
+ startScmServer();
+ // add one HddsVolume and two DbVolume
+ addHddsVolume();
+ addDbVolume();
+ addDbVolume();
+
+ startPreFinalizedDatanode();
+ dsm.finalizeUpgrade();
+
+ DbVolume dbVolume = ((HddsVolume) dsm.getContainer().getVolumeSet()
+ .getVolumesList().get(0)).getDbVolume();
+ Assert.assertNotNull(dbVolume);
+
+ dsm.finalizeUpgrade();
+ // DB Dir should be the same.
+ Assert.assertEquals(dbVolume, ((HddsVolume) dsm.getContainer()
+ .getVolumeSet().getVolumesList().get(0)).getDbVolume());
+ }
+
+ /**
+ * For a finalized cluster, add a new HddsVolume.
+ */
+ @Test
+ public void testAddHddsVolumeAfterFinalize() throws Exception {
+ // start DN and SCM
+ startScmServer();
+ addHddsVolume();
+
+ startPreFinalizedDatanode();
+ dsm.finalizeUpgrade();
+
+ // Add a new HddsVolume. It should have DB created after DN restart.
+ addHddsVolume();
+ restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(), true);
+ for (StorageVolume vol:
+ dsm.getContainer().getVolumeSet().getVolumesList()) {
+ HddsVolume hddsVolume = (HddsVolume) vol;
+ if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+ Assert.assertTrue(hddsVolume.isDbLoaded());
+ Assert.assertTrue(hddsVolume.getDbParentDir().getAbsolutePath()
+ .startsWith(hddsVolume.getStorageDir().getAbsolutePath()));
+ } else {
+ Assert.assertFalse(hddsVolume.isDbLoaded());
+ }
+ }
+ }
+
+ /**
+ * For a finalized cluster, add a new DbVolume.
+ */
+ @Test
+ public void testAddDbVolumeAfterFinalize() throws Exception {
+ startScmServer();
+ addHddsVolume();
+
+ startPreFinalizedDatanode();
+ HddsVolume hddsVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
+ .getVolumesList().get(0);
+ Assert.assertNull(hddsVolume.getDbParentDir());
+ dsm.finalizeUpgrade();
+ // DB is created during upgrade
+ File dbDir = hddsVolume.getDbParentDir();
+ Assert.assertTrue(dbDir.getAbsolutePath().startsWith(
+ hddsVolume.getStorageDir().getAbsolutePath()));
+
+ // Add a new DbVolume
+ addDbVolume();
+ restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(), true);
+
+ // HddsVolume should still use the rocksDB under it's volume
+ DbVolume dbVolume = (DbVolume) dsm.getContainer().getDbVolumeSet()
+ .getVolumesList().get(0);
+ Assert.assertEquals(0, dbVolume.getHddsVolumeIDs().size());
+
+ if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+ hddsVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
+ .getVolumesList().get(0);
+ Assert.assertEquals(dbDir, hddsVolume.getDbParentDir());
+ Assert.assertTrue(hddsVolume.isDbLoaded());
+ }
+ }
+
+ /**
+ * For a finalized cluster, add a new DbVolume and a new HddsVolume.
+ */
+ @Test
+ public void testAddDbAndHddsVolumeAfterFinalize() throws Exception {
+ // start DN and SCM
+ startScmServer();
+ addHddsVolume();
+
+ startPreFinalizedDatanode();
+ dsm.finalizeUpgrade();
+
+ addDbVolume();
+ File newDataVolume = addHddsVolume();
+ restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(), true);
+
+ DbVolume dbVolume = (DbVolume) dsm.getContainer().getDbVolumeSet()
+ .getVolumesList().get(0);
+
+ for (StorageVolume vol:
+ dsm.getContainer().getVolumeSet().getVolumesList()) {
+ HddsVolume hddsVolume = (HddsVolume) vol;
+ File dbFile;
+ if (hddsVolume.getStorageDir().getAbsolutePath().startsWith(
+ newDataVolume.getAbsolutePath())) {
+ if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+ Assert.assertEquals(dbVolume, hddsVolume.getDbVolume());
+ }
+ // RocksDB of newly added HddsVolume is created on the newly added
+ // DbVolume
+ dbFile = new File(dbVolume.getStorageDir() + "/" +
+ hddsVolume.getClusterID() + "/" + hddsVolume.getStorageID());
+ } else {
+ Assert.assertNull(hddsVolume.getDbVolume());
+ dbFile = new File(hddsVolume.getStorageDir() + "/" +
+ hddsVolume.getClusterID() + "/" + hddsVolume.getStorageID());
+ }
+ if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+ Assert.assertTrue(hddsVolume.isDbLoaded());
+ Assert.assertTrue(hddsVolume.getDbParentDir().exists());
+ Assert.assertTrue(dbFile.exists());
+ Assert.assertEquals(dbFile, hddsVolume.getDbParentDir());
+ }
+ }
+ }
+
+ /**
+ * Test data write after finalization.
+ */
+ @Test
+ public void testWriteWithV3Enabled() throws Exception {
+ testWrite(false, OzoneConsts.SCHEMA_V2);
+ }
+
+ /**
+ * Test data write after finalization.
+ */
+ @Test
+ public void testWriteWithV3Disabled() throws Exception {
+ testWrite(true, OzoneConsts.SCHEMA_V3);
+ }
+
+ public void testWrite(boolean enable, String expectedVersion)
+ throws Exception {
+ // start DN and SCM
+ startScmServer();
+ addHddsVolume();
+ // Disable Schema V3
+ conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED, false);
+ startPreFinalizedDatanode();
+ dsm.finalizeUpgrade();
+
+ final Pipeline pipeline = getPipeline();
+ // Create a container to write data.
+ final long containerID1 = addContainer(pipeline);
+ putBlock(containerID1, pipeline);
+ closeContainer(containerID1, pipeline);
+ KeyValueContainer container = (KeyValueContainer)
+ dsm.getContainer().getContainerSet().getContainer(containerID1);
+ // When SchemaV3 is disabled, new data should be saved as SchemaV2.
+ Assert.assertEquals(OzoneConsts.SCHEMA_V2,
+ container.getContainerData().getSchemaVersion());
+
+ // Set SchemaV3 enable status
+ conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED,
+ enable);
+ restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(), true);
+
+ // Write new data
+ final long containerID2 = addContainer(pipeline);
+ putBlock(containerID2, pipeline);
+ closeContainer(containerID2, pipeline);
+ container = (KeyValueContainer)
+ dsm.getContainer().getContainerSet().getContainer(containerID2);
+ // If SchemaV3 is enabled, new data should be saved as SchemaV3
+ // If SchemaV3 is still disabled, new data should still be saved as SchemaV2
+ Assert.assertEquals(expectedVersion,
+ container.getContainerData().getSchemaVersion());
+ }
+
+ /**
+ * Test data read during and after finalization.
+ */
+ @Test
+ public void testReadsDuringFinalize() throws Exception {
+ // start DN and SCM
+ startScmServer();
+ addHddsVolume();
+ startPreFinalizedDatanode();
+ final Pipeline pipeline = getPipeline();
+
+ // Add data to read.
+ final long containerID = addContainer(pipeline);
+ ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
+ pipeline);
+ closeContainer(containerID, pipeline);
+
+ // Create thread to keep reading during finalization.
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ Future<Void> readFuture = executor.submit(() -> {
+ // Layout version check should be thread safe.
+ while (!dsm.getLayoutVersionManager()
+ .isAllowed(HDDSLayoutFeature.DATANODE_SCHEMA_V3)) {
+ readChunk(writeChunk, pipeline);
+ }
+ // Make sure we can read after finalizing too.
+ readChunk(writeChunk, pipeline);
+ return null;
+ });
+
+ dsm.finalizeUpgrade();
+ // If there was a failure reading during the upgrade, the exception will
+ // be thrown here.
+ readFuture.get();
+ }
+
+ /**
+ * Test finalization failure.
+ */
+ @Test
+ public void testFinalizeFailure() throws Exception {
+ // start DN and SCM
+ startScmServer();
+ addHddsVolume();
+ // Let HddsVolume be formatted to mimic the real cluster upgrade
+ // Set layout version.
+ DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+ UUID.randomUUID().toString(),
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+ layoutStorage.initialize();
+ dsm = new DatanodeStateMachine(
+ ContainerTestUtils.createDatanodeDetails(), conf, null, null, null);
+ HddsVolume dataVolume = (
+ HddsVolume) dsm.getContainer().getVolumeSet().getVolumesList().get(0);
+ // Format HddsVolume to mimic the real cluster upgrade situation
+ dataVolume.format(CLUSTER_ID);
+ File idDir = new File(dataVolume.getStorageDir(), CLUSTER_ID);
+ if (!idDir.mkdir()) {
+ Assert.fail("Failed to create id directory");
+ }
+ Assert.assertNull(dataVolume.getDbParentDir());
+
+ // Restart DN
+ restartDatanode(
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+
+ // Write some data.
+ final Pipeline pipeline = getPipeline();
+ final long containerID = addContainer(pipeline);
+ ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
+ pipeline);
+ closeContainer(containerID, pipeline);
+ KeyValueContainer container = (KeyValueContainer)
+ dsm.getContainer().getContainerSet().getContainer(containerID);
+ Assert.assertEquals(OzoneConsts.SCHEMA_V2,
+ container.getContainerData().getSchemaVersion());
+
+
+ HddsVolume volume = Mockito.mock(HddsVolume.class);
+ Mockito.doThrow(new IOException("Failed to init DB")).when(volume).
+ createDbStore(anyObject());
+ Map volumeMap = new HashMap<String, StorageVolume>();
+ volumeMap.put(dataVolume.getStorageID(), volume);
+ dsm.getContainer().getVolumeSet().setVolumeMap(volumeMap);
+
+ // Finalize will fail because of DB creation failure
+ try {
+ dsm.finalizeUpgrade();
+ } catch (Exception e) {
+ // Currently there will be retry if finalization failed.
+ // Let's assume retry is terminated by user.
+ }
+
+ // Check that old data is readable
+ container = (KeyValueContainer)
+ dsm.getContainer().getContainerSet().getContainer(containerID);
+ Assert.assertEquals(OzoneConsts.SCHEMA_V2,
+ container.getContainerData().getSchemaVersion());
+ readChunk(writeChunk, pipeline);
+
+ // SchemaV3 is not finalized, so still ERASURE_CODED_STORAGE_SUPPORT
+ restartDatanode(
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+
+ // Old data is readable after DN restart
+ container = (KeyValueContainer)
+ dsm.getContainer().getContainerSet().getContainer(containerID);
+ Assert.assertEquals(OzoneConsts.SCHEMA_V2,
+ container.getContainerData().getSchemaVersion());
+ readChunk(writeChunk, pipeline);
+ }
+
+ public void checkContainerPathID(long containerID, String expectedID) {
+ KeyValueContainerData data =
+ (KeyValueContainerData) dsm.getContainer().getContainerSet()
+ .getContainer(containerID).getContainerData();
+ Assert.assertTrue(data.getChunksPath().contains(expectedID));
+ Assert.assertTrue(data.getMetadataPath().contains(expectedID));
+ }
+
+ public List<File> getHddsSubdirs(File volume) {
+ File[] subdirsArray = getHddsRoot(volume).listFiles(File::isDirectory);
+ Assert.assertNotNull(subdirsArray);
+ return Arrays.asList(subdirsArray);
+ }
+
+ public File getHddsRoot(File volume) {
+ return new File(HddsVolumeUtil.getHddsRoot(volume.getAbsolutePath()));
+ }
+
+ /**
+ * Starts the datanode with the fore layout version, and calls the version
+ * endpoint task to get cluster ID and SCM ID.
+ *
+ * The daemon for the datanode state machine is not started in this test.
+ * This greatly speeds up execution time.
+ * It means we do not have heartbeat functionality or pre-finalize
+ * upgrade actions, but neither of those things are needed for these tests.
+ */
+ public void startPreFinalizedDatanode() throws Exception {
+ // Set layout version.
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+ tempFolder.getRoot().getAbsolutePath());
+ DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+ UUID.randomUUID().toString(),
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+ layoutStorage.initialize();
+
+ // Build and start the datanode.
+ DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
+ DatanodeStateMachine newDsm = new DatanodeStateMachine(dd,
+ conf, null, null, null);
+ int actualMlv = newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
+ Assert.assertEquals(
+ HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(),
+ actualMlv);
+ if (dsm != null) {
+ dsm.close();
+ }
+ dsm = newDsm;
+
+ callVersionEndpointTask();
+ }
+
+ public void restartDatanode(int expectedMlv, boolean exactMatch)
+ throws Exception {
+ // Stop existing datanode.
+ DatanodeDetails dd = dsm.getDatanodeDetails();
+ dsm.close();
+
+ // Start new datanode with the same configuration.
+ dsm = new DatanodeStateMachine(dd,
+ conf, null, null, null);
+ int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
+ if (exactMatch) {
+ Assert.assertEquals(expectedMlv, mlv);
+ } else {
+ Assert.assertTrue("Expected minimum mlv(" + expectedMlv
+ + ") is smaller than mlv(" + mlv + ").", expectedMlv <= mlv);
+ }
+
+ callVersionEndpointTask();
+ }
+
+ /**
+ * Get the cluster ID and SCM ID from SCM to the datanode.
+ */
+ public void callVersionEndpointTask() throws Exception {
+ try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
+ address, 1000)) {
+ VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
+ dsm.getContainer());
+ esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+ vet.call();
+ }
+ }
+
+ public String startScmServer() throws IOException {
+ String scmID = UUID.randomUUID().toString();
+ scmServerImpl = new ScmTestMock(CLUSTER_ID, scmID);
+ scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+ scmServerImpl, address, 10);
+ return scmID;
+ }
+
+ /// CONTAINER OPERATIONS ///
+ public void readChunk(ContainerProtos.WriteChunkRequestProto writeChunk,
+ Pipeline pipeline) throws Exception {
+ ContainerProtos.ContainerCommandRequestProto readChunkRequest =
+ ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
+
+ dispatchRequest(readChunkRequest);
+ }
+
+ public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
+ Pipeline pipeline) throws Exception {
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ getWriteChunk(containerID, pipeline);
+ dispatchRequest(writeChunkRequest);
+
+ ContainerProtos.ContainerCommandRequestProto putBlockRequest =
+ ContainerTestHelper.getPutBlockRequest(pipeline,
+ writeChunkRequest.getWriteChunk());
+ dispatchRequest(putBlockRequest);
+
+ return writeChunkRequest.getWriteChunk();
+ }
+
+ public ContainerProtos.ContainerCommandRequestProto getWriteChunk(
+ long containerID, Pipeline pipeline) throws Exception {
+ return ContainerTestHelper.getWriteChunkRequest(pipeline,
+ ContainerTestHelper.getTestBlockID(containerID), 100, null);
+ }
+
+ public Pipeline getPipeline() {
+ return MockPipeline.createPipeline(
+ Collections.singletonList(dsm.getDatanodeDetails()));
+ }
+
+ public long addContainer(Pipeline pipeline)
+ throws Exception {
+ long containerID = random.nextInt(Integer.MAX_VALUE);
+ ContainerProtos.ContainerCommandRequestProto createContainerRequest =
+ ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
+ dispatchRequest(createContainerRequest);
+
+ return containerID;
+ }
+
+ public void deleteContainer(long containerID, Pipeline pipeline)
+ throws Exception {
+ ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
+ ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
+ dispatchRequest(deleteContainerRequest);
+ }
+
+ public void closeContainer(long containerID, Pipeline pipeline)
+ throws Exception {
+ closeContainer(containerID, pipeline, ContainerProtos.Result.SUCCESS);
+ }
+
+ public void closeContainer(long containerID, Pipeline pipeline,
+ ContainerProtos.Result expectedResult) throws Exception {
+ ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
+ ContainerTestHelper.getCloseContainer(pipeline, containerID);
+ dispatchRequest(closeContainerRequest, expectedResult);
+ }
+
+ public void dispatchRequest(
+ ContainerProtos.ContainerCommandRequestProto request) {
+ dispatchRequest(request, ContainerProtos.Result.SUCCESS);
+ }
+
+ public void dispatchRequest(
+ ContainerProtos.ContainerCommandRequestProto request,
+ ContainerProtos.Result expectedResult) {
+ ContainerProtos.ContainerCommandResponseProto response =
+ dsm.getContainer().getDispatcher().dispatch(request, null);
+ Assert.assertEquals(expectedResult, response.getResult());
+ }
+
+ /// VOLUME OPERATIONS ///
+
+ /**
+ * Append a datanode volume to the existing volumes in the configuration.
+ * @return The root directory for the new volume.
+ */
+ public File addHddsVolume() throws IOException {
+ File vol = tempFolder.newFolder(UUID.randomUUID().toString());
+ String[] existingVolumes =
+ conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
+ List<String> allVolumes = new ArrayList<>();
+ if (existingVolumes != null) {
+ allVolumes.addAll(Arrays.asList(existingVolumes));
+ }
+
+ allVolumes.add(vol.getAbsolutePath());
+ conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+ allVolumes.toArray(new String[0]));
+
+ return vol;
+ }
+
+ /**
+ * Append a db volume to the existing volumes in the configuration.
+ * @return The root directory for the new volume.
+ */
+ public File addDbVolume() throws Exception {
+ File vol = tempFolder.newFolder(UUID.randomUUID().toString());
+ String[] existingVolumes =
+ conf.getStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR);
+ List<String> allVolumes = new ArrayList<>();
+ if (existingVolumes != null) {
+ allVolumes.addAll(Arrays.asList(existingVolumes));
+ }
+
+ allVolumes.add(vol.getAbsolutePath());
+ conf.setStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
+ allVolumes.toArray(new String[0]));
+
+ return vol;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org