You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2022/05/17 04:42:43 UTC

[ozone] branch HDDS-3630 updated: HDDS-6597. [Merge rocksdb in datanode] Non-rolling upgrade supports container Schema V3. (#3392)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-3630
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3630 by this push:
     new 3178ba5a94 HDDS-6597. [Merge rocksdb in datanode] Non-rolling upgrade supports container Schema V3. (#3392)
3178ba5a94 is described below

commit 3178ba5a94564799934d971ad66413a1d22b41f3
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Tue May 17 12:42:39 2022 +0800

    HDDS-6597. [Merge rocksdb in datanode] Non-rolling upgrade supports container Schema V3. (#3392)
---
 .../upgrade/AbstractLayoutVersionManager.java      |  12 +-
 .../common/statemachine/DatanodeStateMachine.java  |   7 +
 .../states/endpoint/VersionEndpointTask.java       |   8 +-
 .../ozone/container/common/volume/HddsVolume.java  |  63 +-
 .../container/common/volume/MutableVolumeSet.java  |   5 +
 .../container/common/volume/StorageVolume.java     |   7 +
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  17 +-
 .../upgrade/DatanodeSchemaV3FinalizeAction.java    |  82 +++
 .../upgrade/TestDatanodeUpgradeToSchemaV3.java     | 759 +++++++++++++++++++++
 9 files changed, 928 insertions(+), 32 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
index 2ce6e02409..ccf7c1fba8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/AbstractLayoutVersionManager.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.ObjectName;
+
 /**
  * Layout Version Manager containing generic method implementations.
  */
@@ -58,6 +60,7 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
   // Note that MLV may have been incremented during the upgrade
   // by the time the value is read/used.
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private ObjectName mBean;
 
   protected void init(int version, T[] lfs) throws IOException {
     lock.writeLock().lock();
@@ -81,7 +84,7 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
           mlvFeature, mlvFeature.layoutVersion(),
           slvFeature, slvFeature.layoutVersion());
 
-      MBeans.register("LayoutVersionManager",
+      mBean = MBeans.register("LayoutVersionManager",
           getClass().getSimpleName(), this);
     } finally {
       lock.writeLock().unlock();
@@ -215,4 +218,11 @@ public abstract class AbstractLayoutVersionManager<T extends LayoutFeature>
       lock.readLock().unlock();
     }
   }
+
+  public void close() {
+    if (mBean != null) {
+      MBeans.unregister(mBean);
+      mBean = null;
+    }
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index e58418fc76..63b3055364 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -360,6 +360,9 @@ public class DatanodeStateMachine implements Closeable {
     if (cmdProcessThread != null) {
       cmdProcessThread.interrupt();
     }
+    if (layoutVersionManager != null) {
+      layoutVersionManager.close();
+    }
     context.setState(DatanodeStates.getLastState());
     replicationSupervisorMetrics.unRegister();
     executorService.shutdown();
@@ -680,4 +683,8 @@ public class DatanodeStateMachine implements Closeable {
   public UpgradeFinalizer<DatanodeStateMachine> getUpgradeFinalizer() {
     return upgradeFinalizer;
   }
+
+  public ConfigurationSource getConf() {
+    return conf;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
index 9e0669c8e2..f9b0d882f0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
@@ -82,10 +81,9 @@ public class VersionEndpointTask implements
           Preconditions.checkNotNull(clusterId,
               "Reply from SCM: clusterId cannot be null");
 
-          // Check DbVolumes
-          if (SchemaV3.isFinalizedAndEnabled(configuration)) {
-            checkVolumeSet(ozoneContainer.getDbVolumeSet(), scmId, clusterId);
-          }
+          // Check DbVolumes, format DbVolume at first register time.
+          checkVolumeSet(ozoneContainer.getDbVolumeSet(), scmId, clusterId);
+
           // Check HddsVolumes
           checkVolumeSet(ozoneContainer.getVolumeSet(), scmId, clusterId);
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index 513882eb9e..9fc07840da 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -22,13 +22,17 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
 import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures.SchemaV3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +73,7 @@ public class HddsVolume extends StorageVolume {
   // container db path. This is initialized only once together with dbVolume,
   // and stored as a member to prevent spawning lots of File objects.
   private File dbParentDir;
+  private AtomicBoolean dbLoaded = new AtomicBoolean(false);
 
   /**
    * Builder for HddsVolume.
@@ -114,7 +119,9 @@ public class HddsVolume extends StorageVolume {
       MutableVolumeSet dbVolumeSet) throws IOException {
     super.createWorkingDir(workingDirName, dbVolumeSet);
 
-    if (SchemaV3.isFinalizedAndEnabled(getConf())) {
+    // Create DB store for a newly formatted volume
+    if (VersionedDatanodeFeatures.isFinalized(
+        HDDSLayoutFeature.DATANODE_SCHEMA_V3)) {
       createDbStore(dbVolumeSet);
     }
   }
@@ -133,9 +140,7 @@ public class HddsVolume extends StorageVolume {
     if (volumeIOStats != null) {
       volumeIOStats.unregister();
     }
-    if (SchemaV3.isFinalizedAndEnabled(getConf())) {
-      closeDbStore();
-    }
+    closeDbStore();
   }
 
   @Override
@@ -144,9 +149,7 @@ public class HddsVolume extends StorageVolume {
     if (volumeIOStats != null) {
       volumeIOStats.unregister();
     }
-    if (SchemaV3.isFinalizedAndEnabled(getConf())) {
-      closeDbStore();
-    }
+    closeDbStore();
   }
 
   /**
@@ -178,6 +181,10 @@ public class HddsVolume extends StorageVolume {
     return this.dbParentDir;
   }
 
+  public boolean isDbLoaded() {
+    return dbLoaded.get();
+  }
+
   public void loadDbStore() throws IOException {
     // DN startup for the first time, not registered yet,
     // so the DbVolume is not formatted.
@@ -185,6 +192,13 @@ public class HddsVolume extends StorageVolume {
       return;
     }
 
+    // DB is already loaded
+    if (dbLoaded.get()) {
+      LOG.warn("Schema V3 db is already loaded from {} for volume {}",
+          getDbParentDir(), getStorageID());
+      return;
+    }
+
     File clusterIdDir = new File(dbVolume == null ?
         getStorageDir() : dbVolume.getStorageDir(),
         getClusterID());
@@ -214,6 +228,9 @@ public class HddsVolume extends StorageVolume {
     }
 
     dbParentDir = storageIdDir;
+    dbLoaded.set(true);
+    LOG.info("SchemaV3 db is loaded at {} for volume {}", containerDBPath,
+        getStorageID());
   }
 
   /**
@@ -221,21 +238,22 @@ public class HddsVolume extends StorageVolume {
    * Use the HddsVolume directly if no DbVolume found.
    * @param dbVolumeSet
    */
-  public void createDbStore(MutableVolumeSet dbVolumeSet)
-      throws IOException {
+  public void createDbStore(MutableVolumeSet dbVolumeSet) throws IOException {
     DbVolume chosenDbVolume = null;
     File clusterIdDir;
+    String workingDir = getWorkingDir() == null ? getClusterID() :
+        getWorkingDir();
 
     if (dbVolumeSet == null || dbVolumeSet.getVolumesList().isEmpty()) {
       // No extra db volumes specified, just create db under the HddsVolume.
-      clusterIdDir = new File(getStorageDir(), getClusterID());
+      clusterIdDir = new File(getStorageDir(), workingDir);
     } else {
       // Randomly choose a DbVolume for simplicity.
       List<DbVolume> dbVolumeList = StorageVolumeUtil.getDbVolumesList(
           dbVolumeSet.getVolumesList());
       chosenDbVolume = dbVolumeList.get(
           ThreadLocalRandom.current().nextInt(dbVolumeList.size()));
-      clusterIdDir = new File(chosenDbVolume.getStorageDir(), getClusterID());
+      clusterIdDir = new File(chosenDbVolume.getStorageDir(), workingDir);
     }
 
     if (!clusterIdDir.exists()) {
@@ -252,14 +270,19 @@ public class HddsVolume extends StorageVolume {
           + getStorageID());
     }
 
-    // Init the db instance for HddsVolume under the subdir above.
+    // Create the db instance for HddsVolume under the subdir above.
     String containerDBPath = new File(storageIdDir, CONTAINER_DB_NAME)
         .getAbsolutePath();
     try {
-      initPerDiskDBStore(containerDBPath, getConf());
+      HddsVolumeUtil.initPerDiskDBStore(containerDBPath, getConf());
+      dbLoaded.set(true);
+      LOG.info("SchemaV3 db is created and loaded at {} for volume {}",
+          containerDBPath, getStorageID());
     } catch (IOException e) {
-      throw new IOException("Can't init db instance under path "
-          + containerDBPath + " for volume " + getStorageID());
+      String errMsg = "Can't create db instance under path "
+          + containerDBPath + " for volume " + getStorageID();
+      LOG.error(errMsg, e);
+      throw new IOException(errMsg);
     }
 
     // Set the dbVolume and dbParentDir of the HddsVolume for db path lookup.
@@ -268,15 +291,23 @@ public class HddsVolume extends StorageVolume {
     if (chosenDbVolume != null) {
       chosenDbVolume.addHddsDbStorePath(getStorageID(), containerDBPath);
     }
+
+    // If SchemaV3 is disabled, close the DB instance
+    if (!SchemaV3.isFinalizedAndEnabled(getConf())) {
+      closeDbStore();
+    }
   }
 
   private void closeDbStore() {
-    if (dbParentDir == null) {
+    if (!dbLoaded.get()) {
       return;
     }
 
     String containerDBPath = new File(dbParentDir, CONTAINER_DB_NAME)
         .getAbsolutePath();
     DatanodeStoreCache.getInstance().removeDB(containerDBPath);
+    dbLoaded.set(false);
+    LOG.info("SchemaV3 db is stopped at {} for volume {}", containerDBPath,
+        getStorageID());
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
index 78f0b9c4b7..22571685df 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/MutableVolumeSet.java
@@ -454,6 +454,11 @@ public class MutableVolumeSet implements VolumeSet {
     return ImmutableMap.copyOf(volumeMap);
   }
 
+  @VisibleForTesting
+  public void setVolumeMap(Map<String, StorageVolume> map) {
+    this.volumeMap = map;
+  }
+
   @VisibleForTesting
   public Map<StorageType, List<StorageVolume>> getVolumeStateMap() {
     return ImmutableMap.copyOf(volumeStateMap);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
index 18468f5a9d..18892b68a2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java
@@ -109,6 +109,8 @@ public abstract class StorageVolume
 
   private final VolumeSet volumeSet;
 
+  private String workingDir;
+
   protected StorageVolume(Builder<?> b) throws IOException {
     if (!b.failedVolume) {
       StorageLocation location = StorageLocation.parse(b.volumeRootStr);
@@ -188,6 +190,7 @@ public abstract class StorageVolume
       throw new IOException("Unable to create ID directory " + idDir +
           " for datanode.");
     }
+    this.workingDir = workingDirName;
   }
 
   private VolumeState analyzeVolumeState() {
@@ -381,6 +384,10 @@ public abstract class StorageVolume
     return this.storageDir;
   }
 
+  public String getWorkingDir() {
+    return this.workingDir;
+  }
+
   public void refreshVolumeInfo() {
     volumeInfo.refreshNow();
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 2709ea5e3e..df61cac960 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -123,10 +123,8 @@ public class OzoneContainer {
    * @throws IOException
    */
   public OzoneContainer(
-      DatanodeDetails datanodeDetails, ConfigurationSource
-      conf, StateContext context, CertificateClient certClient
-  )
-      throws IOException {
+      DatanodeDetails datanodeDetails, ConfigurationSource conf,
+      StateContext context, CertificateClient certClient) throws IOException {
     config = conf;
     this.datanodeDetails = datanodeDetails;
     this.context = context;
@@ -137,13 +135,12 @@ public class OzoneContainer {
     volumeSet.setFailedVolumeListener(this::handleVolumeFailures);
     metaVolumeSet = new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
         context, VolumeType.META_VOLUME, volumeChecker);
-    if (SchemaV3.isFinalizedAndEnabled(conf)) {
-      dbVolumeSet = HddsServerUtil.getDatanodeDbDirs(conf).isEmpty() ? null :
-          new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
-              context, VolumeType.DB_VOLUME, volumeChecker);
+
+    dbVolumeSet = HddsServerUtil.getDatanodeDbDirs(conf).isEmpty() ? null :
+        new MutableVolumeSet(datanodeDetails.getUuidString(), conf,
+            context, VolumeType.DB_VOLUME, volumeChecker);
+    if (SchemaV3.isFinalizedAndEnabled(config)) {
       HddsVolumeUtil.loadAllHddsVolumeDbStore(volumeSet, dbVolumeSet, LOG);
-    } else {
-      dbVolumeSet = null;
     }
 
     containerSet = new ContainerSet();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeSchemaV3FinalizeAction.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeSchemaV3FinalizeAction.java
new file mode 100644
index 0000000000..7436284c72
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/upgrade/DatanodeSchemaV3FinalizeAction.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.upgrade;
+
+import org.apache.hadoop.hdds.upgrade.HDDSUpgradeAction;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.upgrade.UpgradeActionHdds;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature.DATANODE_SCHEMA_V3;
+import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE;
+import static org.apache.hadoop.ozone.upgrade.UpgradeActionHdds.Component.DATANODE;
+
+/**
+ * Upgrade Action for DataNode for SCHEMA V3.
+ */
+@UpgradeActionHdds(feature = DATANODE_SCHEMA_V3, component = DATANODE,
+    type = ON_FINALIZE)
+public class DatanodeSchemaV3FinalizeAction
+    implements HDDSUpgradeAction<DatanodeStateMachine> {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeSchemaV3FinalizeAction.class);
+
+  @Override
+  public void execute(DatanodeStateMachine dsm) throws Exception {
+    LOG.info("Upgrading Datanode volume layout for Schema V3 support.");
+
+    // Load RocksDB for each HddsVolume, build the relationship between
+    // HddsVolume and DbVolume if DbVolume is configured.
+    MutableVolumeSet dataVolumeSet = dsm.getContainer().getVolumeSet();
+    MutableVolumeSet dbVolumeSet = dsm.getContainer().getDbVolumeSet();
+    Preconditions.assertNotNull(dataVolumeSet,
+        "Data Volume should not be null");
+
+    dataVolumeSet.writeLock();
+    try {
+      for (StorageVolume hddsVolume : dataVolumeSet.getVolumesList()) {
+        HddsVolume dataVolume = (HddsVolume) hddsVolume;
+        if (dataVolume.getDbParentDir() != null) {
+          // The RocksDB for this hddsVolume is already created(newly added
+          // volume case).
+          continue;
+        }
+        dataVolume.createDbStore(dbVolumeSet);
+      }
+    } finally {
+      dataVolumeSet.writeUnlock();
+    }
+    DatanodeConfiguration dcf =
+        dsm.getConf().getObject(DatanodeConfiguration.class);
+    if (!dcf.getContainerSchemaV3Enabled()) {
+      LOG.info("Schema V3 is disabled. Won't load RocksDB in upgrade.");
+      return;
+    }
+    HddsVolumeUtil.loadAllHddsVolumeDbStore(dataVolumeSet, dbVolumeSet, LOG);
+  }
+}
+
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
new file mode 100644
index 0000000000..9082166b6f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
@@ -0,0 +1,759 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.upgrade;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.container.common.ScmTestMock;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.DbVolume;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.mockito.ArgumentMatchers.anyObject;
+
+/**
+ * Tests upgrading a single datanode from container Schema V2 to Schema V3.
+ */
+@RunWith(Parameterized.class)
+public class TestDatanodeUpgradeToSchemaV3 {
+  @Rule
+  public TemporaryFolder tempFolder;
+
+  private DatanodeStateMachine dsm;
+  private final OzoneConfiguration conf;
+  private static final String CLUSTER_ID = "clusterID";
+  private final boolean schemaV3Enabled;
+
+  private RPC.Server scmRpcServer;
+  private InetSocketAddress address;
+  private ScmTestMock scmServerImpl;
+
+  private Random random;
+
+  // hdds.datanode.container.schema.v3.enabled
+  @Parameterized.Parameters
+  public static Collection<Object[]> getSchemaFiles() {
+    Collection<Object[]> parameters = new ArrayList<>();
+    parameters.add(new Boolean[]{false});
+    parameters.add(new Boolean[]{true});
+    return parameters;
+  }
+
+  public TestDatanodeUpgradeToSchemaV3(Boolean enable) {
+    this.schemaV3Enabled = enable;
+    conf = new OzoneConfiguration();
+    conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED,
+        this.schemaV3Enabled);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    tempFolder = new TemporaryFolder();
+    tempFolder.create();
+    random = new Random();
+
+    address = SCMTestUtils.getReuseableAddress();
+    conf.setSocketAddr(ScmConfigKeys.OZONE_SCM_NAMES, address);
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        tempFolder.getRoot().getAbsolutePath());
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (scmRpcServer != null) {
+      scmRpcServer.stop();
+    }
+
+    if (dsm != null) {
+      dsm.close();
+    }
+  }
+
+  /**
+   * Test RocksDB is created on data volume, not matter Schema V3 is
+   * enabled or not.
+   * If Schema V3 is enabled, RocksDB will be loaded.
+   */
+  @Test
+  public void testDBOnHddsVolume() throws Exception {
+    // start DN and SCM
+    startScmServer();
+    addHddsVolume();
+
+    startPreFinalizedDatanode();
+    HddsVolume dataVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
+        .getVolumesList().get(0);
+    Assert.assertNull(dataVolume.getDbVolume());
+    Assert.assertFalse(dataVolume.isDbLoaded());
+
+    dsm.finalizeUpgrade();
+    // RocksDB is created during upgrade
+    File dbFile = new File(dataVolume.getStorageDir().getAbsolutePath() + "/" +
+        dataVolume.getClusterID() + "/" + dataVolume.getStorageID());
+    Assert.assertTrue(dbFile.exists());
+
+    // RocksDB loaded when SchemaV3 is enabled
+    if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+      Assert.assertNotNull(dataVolume.getDbParentDir().getAbsolutePath()
+          .startsWith(dataVolume.getStorageDir().toString()));
+    } else {
+      // RocksDB is not loaded when SchemaV3 is disabled.
+      Assert.assertFalse(dataVolume.isDbLoaded());
+    }
+  }
+
+  /**
+   * Test RocksDB is created on DB volume when configured, not matter
+   * Schema V3 is enabled or not.
+   * If Schema V3 is enabled, RocksDB will be loaded.
+   */
+  @Test
+  public void testDBOnDbVolume() throws Exception {
+    // start DN and SCM
+    startScmServer();
+    addHddsVolume();
+    addDbVolume();
+
+    startPreFinalizedDatanode();
+    HddsVolume dataVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
+        .getVolumesList().get(0);
+    Assert.assertNull(dataVolume.getDbParentDir());
+
+    dsm.finalizeUpgrade();
+    // RocksDB is created during upgrade
+    DbVolume dbVolume = (DbVolume) dsm.getContainer().getDbVolumeSet()
+        .getVolumesList().get(0);
+    Assert.assertEquals(dbVolume, dataVolume.getDbVolume());
+    Assert.assertTrue(
+        dbVolume.getHddsVolumeIDs().contains(dataVolume.getStorageID()));
+    File dbFile = new File(dbVolume.getStorageDir().getAbsolutePath() + "/" +
+        dbVolume.getClusterID() + "/" + dataVolume.getStorageID());
+    Assert.assertTrue(dbFile.exists());
+
+    // RocksDB loaded when SchemaV3 is enabled
+    if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+      Assert.assertTrue(dataVolume.getDbParentDir().getAbsolutePath()
+          .startsWith(dbVolume.getStorageDir().toString()));
+    } else {
+      // RocksDB is not loaded when SchemaV3 is disabled.
+      Assert.assertFalse(dataVolume.isDbLoaded());
+    }
+  }
+
+  /**
+   * Test RocksDB in created in Finalize action for an existing hddsVolume.
+   * This mimics the real cluster upgrade situation.
+   */
+  @Test
+  public void testDBCreatedInFinalize() throws Exception {
+    // start DN and SCM
+    startScmServer();
+    // add one HddsVolume
+    addHddsVolume();
+
+    // Set layout version.
+    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+        UUID.randomUUID().toString(),
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+    layoutStorage.initialize();
+    dsm = new DatanodeStateMachine(
+        ContainerTestUtils.createDatanodeDetails(), conf, null, null, null);
+    HddsVolume dataVolume = (
+        HddsVolume) dsm.getContainer().getVolumeSet().getVolumesList().get(0);
+    // Format HddsVolume to mimic the real cluster upgrade situation
+    dataVolume.format(CLUSTER_ID);
+    File idDir = new File(dataVolume.getStorageDir(), CLUSTER_ID);
+    if (!idDir.mkdir()) {
+      Assert.fail("Failed to create id directory");
+    }
+
+    Assert.assertNull(dataVolume.getDbParentDir());
+
+    // Restart DN and finalize upgrade
+    restartDatanode(
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+    dsm.finalizeUpgrade();
+
+    // RocksDB is created by upgrade action
+    dataVolume = ((HddsVolume) dsm.getContainer().getVolumeSet()
+        .getVolumesList().get(0));
+    Assert.assertNotNull(dataVolume.getDbParentDir());
+    if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+      Assert.assertTrue(dataVolume.isDbLoaded());
+    } else {
+      Assert.assertFalse(dataVolume.isDbLoaded());
+    }
+  }
+
+  /**
+   * Test finalize twice won't recreate any RocksDB for HddsVolume.
+   */
+  @Test
+  public void testFinalizeTwice() throws Exception {
+    // start DN and SCM
+    startScmServer();
+    // add one HddsVolume and two DbVolume
+    addHddsVolume();
+    addDbVolume();
+    addDbVolume();
+
+    startPreFinalizedDatanode();
+    dsm.finalizeUpgrade();
+
+    DbVolume dbVolume = ((HddsVolume) dsm.getContainer().getVolumeSet()
+        .getVolumesList().get(0)).getDbVolume();
+    Assert.assertNotNull(dbVolume);
+
+    dsm.finalizeUpgrade();
+    // DB Dir should be the same.
+    Assert.assertEquals(dbVolume, ((HddsVolume) dsm.getContainer()
+        .getVolumeSet().getVolumesList().get(0)).getDbVolume());
+  }
+
+  /**
+   * For a finalized cluster, add a new HddsVolume.
+   */
+  @Test
+  public void testAddHddsVolumeAfterFinalize() throws Exception {
+    // start DN and SCM
+    startScmServer();
+    addHddsVolume();
+
+    startPreFinalizedDatanode();
+    dsm.finalizeUpgrade();
+
+    // Add a new HddsVolume. It should have DB created after DN restart.
+    addHddsVolume();
+    restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(), true);
+    for (StorageVolume vol:
+        dsm.getContainer().getVolumeSet().getVolumesList()) {
+      HddsVolume hddsVolume = (HddsVolume) vol;
+      if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+        Assert.assertTrue(hddsVolume.isDbLoaded());
+        Assert.assertTrue(hddsVolume.getDbParentDir().getAbsolutePath()
+            .startsWith(hddsVolume.getStorageDir().getAbsolutePath()));
+      } else {
+        Assert.assertFalse(hddsVolume.isDbLoaded());
+      }
+    }
+  }
+
+  /**
+   * For a finalized cluster, add a new DbVolume.
+   */
+  @Test
+  public void testAddDbVolumeAfterFinalize() throws Exception {
+    startScmServer();
+    addHddsVolume();
+
+    startPreFinalizedDatanode();
+    HddsVolume hddsVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
+        .getVolumesList().get(0);
+    Assert.assertNull(hddsVolume.getDbParentDir());
+    dsm.finalizeUpgrade();
+    // DB is created during upgrade
+    File dbDir = hddsVolume.getDbParentDir();
+    Assert.assertTrue(dbDir.getAbsolutePath().startsWith(
+        hddsVolume.getStorageDir().getAbsolutePath()));
+
+    // Add a new DbVolume
+    addDbVolume();
+    restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(), true);
+
+    // HddsVolume should still use the rocksDB under it's volume
+    DbVolume dbVolume = (DbVolume) dsm.getContainer().getDbVolumeSet()
+        .getVolumesList().get(0);
+    Assert.assertEquals(0, dbVolume.getHddsVolumeIDs().size());
+
+    if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+      hddsVolume = (HddsVolume) dsm.getContainer().getVolumeSet()
+          .getVolumesList().get(0);
+      Assert.assertEquals(dbDir, hddsVolume.getDbParentDir());
+      Assert.assertTrue(hddsVolume.isDbLoaded());
+    }
+  }
+
+  /**
+   * For a finalized cluster, add a new DbVolume and a new HddsVolume.
+   */
+  @Test
+  public void testAddDbAndHddsVolumeAfterFinalize() throws Exception {
+    // start DN and SCM
+    startScmServer();
+    addHddsVolume();
+
+    startPreFinalizedDatanode();
+    dsm.finalizeUpgrade();
+
+    addDbVolume();
+    File newDataVolume = addHddsVolume();
+    restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(), true);
+
+    DbVolume dbVolume = (DbVolume) dsm.getContainer().getDbVolumeSet()
+        .getVolumesList().get(0);
+
+    for (StorageVolume vol:
+        dsm.getContainer().getVolumeSet().getVolumesList()) {
+      HddsVolume hddsVolume = (HddsVolume) vol;
+      File dbFile;
+      if (hddsVolume.getStorageDir().getAbsolutePath().startsWith(
+          newDataVolume.getAbsolutePath())) {
+        if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+          Assert.assertEquals(dbVolume, hddsVolume.getDbVolume());
+        }
+        // RocksDB of newly added HddsVolume is created on the newly added
+        // DbVolume
+        dbFile = new File(dbVolume.getStorageDir() + "/" +
+            hddsVolume.getClusterID() + "/" + hddsVolume.getStorageID());
+      } else {
+        Assert.assertNull(hddsVolume.getDbVolume());
+        dbFile = new File(hddsVolume.getStorageDir() + "/" +
+            hddsVolume.getClusterID() + "/" + hddsVolume.getStorageID());
+      }
+      if (VersionedDatanodeFeatures.SchemaV3.isFinalizedAndEnabled(conf)) {
+        Assert.assertTrue(hddsVolume.isDbLoaded());
+        Assert.assertTrue(hddsVolume.getDbParentDir().exists());
+        Assert.assertTrue(dbFile.exists());
+        Assert.assertEquals(dbFile, hddsVolume.getDbParentDir());
+      }
+    }
+  }
+
+  /**
+   * Test data write after finalization.
+   */
+  @Test
+  public void testWriteWithV3Enabled() throws Exception {
+    testWrite(false, OzoneConsts.SCHEMA_V2);
+  }
+
+  /**
+   * Test data write after finalization.
+   */
+  @Test
+  public void testWriteWithV3Disabled() throws Exception {
+    testWrite(true, OzoneConsts.SCHEMA_V3);
+  }
+
+  public void testWrite(boolean enable, String expectedVersion)
+      throws Exception {
+    // start DN and SCM
+    startScmServer();
+    addHddsVolume();
+    // Disable Schema V3
+    conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED, false);
+    startPreFinalizedDatanode();
+    dsm.finalizeUpgrade();
+
+    final Pipeline pipeline = getPipeline();
+    // Create a container to write data.
+    final long containerID1 = addContainer(pipeline);
+    putBlock(containerID1, pipeline);
+    closeContainer(containerID1, pipeline);
+    KeyValueContainer container = (KeyValueContainer)
+        dsm.getContainer().getContainerSet().getContainer(containerID1);
+    // When SchemaV3 is disabled, new data should be saved as SchemaV2.
+    Assert.assertEquals(OzoneConsts.SCHEMA_V2,
+        container.getContainerData().getSchemaVersion());
+
+    // Set SchemaV3 enable status
+    conf.setBoolean(DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED,
+        enable);
+    restartDatanode(HDDSLayoutFeature.DATANODE_SCHEMA_V3.layoutVersion(), true);
+
+    // Write new data
+    final long containerID2 = addContainer(pipeline);
+    putBlock(containerID2, pipeline);
+    closeContainer(containerID2, pipeline);
+    container = (KeyValueContainer)
+        dsm.getContainer().getContainerSet().getContainer(containerID2);
+    // If SchemaV3 is enabled, new data should be saved as SchemaV3
+    // If SchemaV3 is still disabled, new data should still be saved as SchemaV2
+    Assert.assertEquals(expectedVersion,
+        container.getContainerData().getSchemaVersion());
+  }
+
+  /**
+   * Test data read during and after finalization.
+   */
+  @Test
+  public void testReadsDuringFinalize() throws Exception {
+    // start DN and SCM
+    startScmServer();
+    addHddsVolume();
+    startPreFinalizedDatanode();
+    final Pipeline pipeline = getPipeline();
+
+    // Add data to read.
+    final long containerID = addContainer(pipeline);
+    ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
+        pipeline);
+    closeContainer(containerID, pipeline);
+
+    // Create thread to keep reading during finalization.
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    Future<Void> readFuture = executor.submit(() -> {
+      // Layout version check should be thread safe.
+      while (!dsm.getLayoutVersionManager()
+          .isAllowed(HDDSLayoutFeature.DATANODE_SCHEMA_V3)) {
+        readChunk(writeChunk, pipeline);
+      }
+      // Make sure we can read after finalizing too.
+      readChunk(writeChunk, pipeline);
+      return null;
+    });
+
+    dsm.finalizeUpgrade();
+    // If there was a failure reading during the upgrade, the exception will
+    // be thrown here.
+    readFuture.get();
+  }
+
+  /**
+   * Test finalization failure.
+   */
+  @Test
+  public void testFinalizeFailure() throws Exception {
+    // start DN and SCM
+    startScmServer();
+    addHddsVolume();
+    // Let HddsVolume be formatted to mimic the real cluster upgrade
+    // Set layout version.
+    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+        UUID.randomUUID().toString(),
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+    layoutStorage.initialize();
+    dsm = new DatanodeStateMachine(
+        ContainerTestUtils.createDatanodeDetails(), conf, null, null, null);
+    HddsVolume dataVolume = (
+        HddsVolume) dsm.getContainer().getVolumeSet().getVolumesList().get(0);
+    // Format HddsVolume to mimic the real cluster upgrade situation
+    dataVolume.format(CLUSTER_ID);
+    File idDir = new File(dataVolume.getStorageDir(), CLUSTER_ID);
+    if (!idDir.mkdir()) {
+      Assert.fail("Failed to create id directory");
+    }
+    Assert.assertNull(dataVolume.getDbParentDir());
+
+    // Restart DN
+    restartDatanode(
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+
+    // Write some data.
+    final Pipeline pipeline = getPipeline();
+    final long containerID = addContainer(pipeline);
+    ContainerProtos.WriteChunkRequestProto writeChunk = putBlock(containerID,
+        pipeline);
+    closeContainer(containerID, pipeline);
+    KeyValueContainer container = (KeyValueContainer)
+        dsm.getContainer().getContainerSet().getContainer(containerID);
+    Assert.assertEquals(OzoneConsts.SCHEMA_V2,
+        container.getContainerData().getSchemaVersion());
+
+
+    HddsVolume volume = Mockito.mock(HddsVolume.class);
+    Mockito.doThrow(new IOException("Failed to init DB")).when(volume).
+        createDbStore(anyObject());
+    Map volumeMap = new HashMap<String, StorageVolume>();
+    volumeMap.put(dataVolume.getStorageID(), volume);
+    dsm.getContainer().getVolumeSet().setVolumeMap(volumeMap);
+
+    // Finalize will fail because of DB creation failure
+    try {
+      dsm.finalizeUpgrade();
+    } catch (Exception e) {
+      // Currently there will be retry if finalization failed.
+      // Let's assume retry is terminated by user.
+    }
+
+    // Check that old data is readable
+    container = (KeyValueContainer)
+        dsm.getContainer().getContainerSet().getContainer(containerID);
+    Assert.assertEquals(OzoneConsts.SCHEMA_V2,
+        container.getContainerData().getSchemaVersion());
+    readChunk(writeChunk, pipeline);
+
+    // SchemaV3 is not finalized, so still ERASURE_CODED_STORAGE_SUPPORT
+    restartDatanode(
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(), true);
+
+    // Old data is readable after DN restart
+    container = (KeyValueContainer)
+        dsm.getContainer().getContainerSet().getContainer(containerID);
+    Assert.assertEquals(OzoneConsts.SCHEMA_V2,
+        container.getContainerData().getSchemaVersion());
+    readChunk(writeChunk, pipeline);
+  }
+
+  public void checkContainerPathID(long containerID, String expectedID) {
+    KeyValueContainerData data =
+        (KeyValueContainerData) dsm.getContainer().getContainerSet()
+            .getContainer(containerID).getContainerData();
+    Assert.assertTrue(data.getChunksPath().contains(expectedID));
+    Assert.assertTrue(data.getMetadataPath().contains(expectedID));
+  }
+
+  public List<File> getHddsSubdirs(File volume) {
+    File[] subdirsArray = getHddsRoot(volume).listFiles(File::isDirectory);
+    Assert.assertNotNull(subdirsArray);
+    return Arrays.asList(subdirsArray);
+  }
+
+  public File getHddsRoot(File volume) {
+    return new File(HddsVolumeUtil.getHddsRoot(volume.getAbsolutePath()));
+  }
+
+  /**
+   * Starts the datanode with the fore layout version, and calls the version
+   * endpoint task to get cluster ID and SCM ID.
+   *
+   * The daemon for the datanode state machine is not started in this test.
+   * This greatly speeds up execution time.
+   * It means we do not have heartbeat functionality or pre-finalize
+   * upgrade actions, but neither of those things are needed for these tests.
+   */
+  public void startPreFinalizedDatanode() throws Exception {
+    // Set layout version.
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        tempFolder.getRoot().getAbsolutePath());
+    DatanodeLayoutStorage layoutStorage = new DatanodeLayoutStorage(conf,
+        UUID.randomUUID().toString(),
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
+    layoutStorage.initialize();
+
+    // Build and start the datanode.
+    DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
+    DatanodeStateMachine newDsm = new DatanodeStateMachine(dd,
+        conf, null, null, null);
+    int actualMlv = newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
+    Assert.assertEquals(
+        HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(),
+        actualMlv);
+    if (dsm != null) {
+      dsm.close();
+    }
+    dsm = newDsm;
+
+    callVersionEndpointTask();
+  }
+
+  public void restartDatanode(int expectedMlv, boolean exactMatch)
+      throws Exception {
+    // Stop existing datanode.
+    DatanodeDetails dd = dsm.getDatanodeDetails();
+    dsm.close();
+
+    // Start new datanode with the same configuration.
+    dsm = new DatanodeStateMachine(dd,
+        conf, null, null, null);
+    int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
+    if (exactMatch) {
+      Assert.assertEquals(expectedMlv, mlv);
+    } else {
+      Assert.assertTrue("Expected minimum mlv(" + expectedMlv
+          + ") is smaller than mlv(" + mlv + ").", expectedMlv <= mlv);
+    }
+
+    callVersionEndpointTask();
+  }
+
+  /**
+   * Get the cluster ID and SCM ID from SCM to the datanode.
+   */
+  public void callVersionEndpointTask() throws Exception {
+    try (EndpointStateMachine esm = ContainerTestUtils.createEndpoint(conf,
+        address, 1000)) {
+      VersionEndpointTask vet = new VersionEndpointTask(esm, conf,
+          dsm.getContainer());
+      esm.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+      vet.call();
+    }
+  }
+
+  public String startScmServer() throws IOException {
+    String scmID = UUID.randomUUID().toString();
+    scmServerImpl = new ScmTestMock(CLUSTER_ID, scmID);
+    scmRpcServer = SCMTestUtils.startScmRpcServer(conf,
+        scmServerImpl, address, 10);
+    return scmID;
+  }
+
+  /// CONTAINER OPERATIONS ///
+  public void readChunk(ContainerProtos.WriteChunkRequestProto writeChunk,
+      Pipeline pipeline)  throws Exception {
+    ContainerProtos.ContainerCommandRequestProto readChunkRequest =
+        ContainerTestHelper.getReadChunkRequest(pipeline, writeChunk);
+
+    dispatchRequest(readChunkRequest);
+  }
+
+  public ContainerProtos.WriteChunkRequestProto putBlock(long containerID,
+      Pipeline pipeline) throws Exception {
+    ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+        getWriteChunk(containerID, pipeline);
+    dispatchRequest(writeChunkRequest);
+
+    ContainerProtos.ContainerCommandRequestProto putBlockRequest =
+        ContainerTestHelper.getPutBlockRequest(pipeline,
+            writeChunkRequest.getWriteChunk());
+    dispatchRequest(putBlockRequest);
+
+    return writeChunkRequest.getWriteChunk();
+  }
+
+  public ContainerProtos.ContainerCommandRequestProto getWriteChunk(
+      long containerID, Pipeline pipeline) throws Exception {
+    return ContainerTestHelper.getWriteChunkRequest(pipeline,
+            ContainerTestHelper.getTestBlockID(containerID), 100, null);
+  }
+
+  public Pipeline getPipeline() {
+    return MockPipeline.createPipeline(
+        Collections.singletonList(dsm.getDatanodeDetails()));
+  }
+
+  public long addContainer(Pipeline pipeline)
+      throws Exception {
+    long containerID = random.nextInt(Integer.MAX_VALUE);
+    ContainerProtos.ContainerCommandRequestProto createContainerRequest =
+        ContainerTestHelper.getCreateContainerRequest(containerID, pipeline);
+    dispatchRequest(createContainerRequest);
+
+    return containerID;
+  }
+
+  public void deleteContainer(long containerID, Pipeline pipeline)
+      throws Exception {
+    ContainerProtos.ContainerCommandRequestProto deleteContainerRequest =
+        ContainerTestHelper.getDeleteContainer(pipeline, containerID, true);
+    dispatchRequest(deleteContainerRequest);
+  }
+
+  public void closeContainer(long containerID, Pipeline pipeline)
+      throws Exception {
+    closeContainer(containerID, pipeline, ContainerProtos.Result.SUCCESS);
+  }
+
+  public void closeContainer(long containerID, Pipeline pipeline,
+      ContainerProtos.Result expectedResult) throws Exception {
+    ContainerProtos.ContainerCommandRequestProto closeContainerRequest =
+        ContainerTestHelper.getCloseContainer(pipeline, containerID);
+    dispatchRequest(closeContainerRequest, expectedResult);
+  }
+
+  public void dispatchRequest(
+      ContainerProtos.ContainerCommandRequestProto request) {
+    dispatchRequest(request, ContainerProtos.Result.SUCCESS);
+  }
+
+  public void dispatchRequest(
+      ContainerProtos.ContainerCommandRequestProto request,
+      ContainerProtos.Result expectedResult) {
+    ContainerProtos.ContainerCommandResponseProto response =
+        dsm.getContainer().getDispatcher().dispatch(request, null);
+    Assert.assertEquals(expectedResult, response.getResult());
+  }
+
+  /// VOLUME OPERATIONS ///
+
+  /**
+   * Append a datanode volume to the existing volumes in the configuration.
+   * @return The root directory for the new volume.
+   */
+  public File addHddsVolume() throws IOException {
+    File vol = tempFolder.newFolder(UUID.randomUUID().toString());
+    String[] existingVolumes =
+        conf.getStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY);
+    List<String> allVolumes = new ArrayList<>();
+    if (existingVolumes != null) {
+      allVolumes.addAll(Arrays.asList(existingVolumes));
+    }
+
+    allVolumes.add(vol.getAbsolutePath());
+    conf.setStrings(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+        allVolumes.toArray(new String[0]));
+
+    return vol;
+  }
+
+  /**
+   * Append a db volume to the existing volumes in the configuration.
+   * @return The root directory for the new volume.
+   */
+  public File addDbVolume() throws Exception {
+    File vol = tempFolder.newFolder(UUID.randomUUID().toString());
+    String[] existingVolumes =
+        conf.getStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR);
+    List<String> allVolumes = new ArrayList<>();
+    if (existingVolumes != null) {
+      allVolumes.addAll(Arrays.asList(existingVolumes));
+    }
+
+    allVolumes.add(vol.getAbsolutePath());
+    conf.setStrings(OzoneConfigKeys.HDDS_DATANODE_CONTAINER_DB_DIR,
+        allVolumes.toArray(new String[0]));
+
+    return vol;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org