You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by li...@apache.org on 2020/06/02 03:11:11 UTC

[hadoop-ozone] 02/07: HDDS-3187 Construct SCM StateMachine. (#819)

This is an automated email from the ASF dual-hosted git repository.

licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 4c035bb2473b1ebf9e87f596d832b6c5473ce62b
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Fri Apr 17 15:39:03 2020 +0800

    HDDS-3187 Construct SCM StateMachine. (#819)
    
    Co-authored-by: Li Cheng <ti...@tencent.com>
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   5 +
 .../java/org/apache/hadoop/hdds/scm/ScmUtils.java  |  12 ++
 .../hdds/scm/container/ContainerManager.java       |   5 +
 .../hdds/scm/container/SCMContainerManager.java    |  21 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |  42 ++++
 .../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java  |  18 +-
 .../hadoop/hdds/scm/ratis/SCMStateMachine.java     |  35 ---
 .../hdds/scm/server/StorageContainerManager.java   |  59 ++++-
 .../scm/{ => server}/ratis/SCMRatisServer.java     |  25 ++-
 .../scm/server/ratis/SCMRatisSnapshotInfo.java     | 179 +++++++++++++++
 .../hdds/scm/server/ratis/SCMStateMachine.java     | 240 +++++++++++++++++++++
 .../hdds/scm/{ => server}/ratis/package-info.java  |   2 +-
 .../scm/{ => server}/ratis/TestSCMRatisServer.java |  23 +-
 .../hdds/scm/server/ratis/TestSCMStateMachine.java | 120 +++++++++++
 14 files changed, 714 insertions(+), 72 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index bb8e145..a8b48ba 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -352,4 +352,9 @@ public final class OzoneConsts {
 
   // SCM HA
   public static final String SCM_SERVICE_ID_DEFAULT = "scmServiceIdDefault";
+
+  // SCM Ratis snapshot file to store the last applied index
+  public static final String SCM_RATIS_SNAPSHOT_INDEX = "scmRatisSnapshotIndex";
+
+  public static final String SCM_RATIS_SNAPSHOT_TERM = "scmRatisSnapshotTerm";
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
index 426341a..bb48654 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.scm.safemode.Precheck;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+
 /**
  * SCM utility class.
  */
@@ -48,4 +50,14 @@ public final class ScmUtils {
     }
   }
 
+  /**
+   * Create SCM directory file based on given path.
+   */
+  public static File createSCMDir(String dirPath) {
+    File dirFile = new File(dirPath);
+    if (!dirFile.mkdirs() && !dirFile.exists()) {
+      throw new IllegalArgumentException("Unable to create path: " + dirFile);
+    }
+    return dirFile;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 43c1ced..f17a2f4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -189,4 +189,9 @@ public interface ContainerManager extends Closeable {
    * @param success
    */
   void notifyContainerReportProcessing(boolean isFullReport, boolean success);
+
+  /**
+   * Flush metadata of container manager if they are required to be persisted.
+   */
+  void flushDB() throws IOException;
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 9f47608..ee8c689 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -365,13 +365,20 @@ public class SCMContainerManager implements ContainerManager {
     }
   }
 
-  /**
-   * Update deleteTransactionId according to deleteTransactionMap.
-   *
-   * @param deleteTransactionMap Maps the containerId to latest delete
-   *                             transaction id for the container.
-   * @throws IOException
-   */
+  @Override
+  public void flushDB() throws IOException {
+    if (containerStore != null) {
+      containerStore.flushDB(true);
+    }
+  }
+
+    /**
+     * Update deleteTransactionId according to deleteTransactionMap.
+     *
+     * @param deleteTransactionMap Maps the containerId to latest delete
+     *                             transaction id for the container.
+     * @throws IOException
+     */
   public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
       throws IOException {
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index c0364ad..eb22566 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -18,8 +18,15 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ScmUtils;
+import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer;
+
+import java.io.File;
+import java.util.Collection;
 
 /**
  * Utility class used by SCM HA.
@@ -34,4 +41,39 @@ public final class SCMHAUtils {
     return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY,
         ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
   }
+
+  public static File createSCMRatisDir(OzoneConfiguration conf)
+      throws  IllegalArgumentException {
+    String scmRatisDir = SCMRatisServer.getSCMRatisDirectory(conf);
+    if (scmRatisDir == null || scmRatisDir.isEmpty()) {
+      throw new IllegalArgumentException(HddsConfigKeys.OZONE_METADATA_DIRS +
+          " must be defined.");
+    }
+    return ScmUtils.createSCMDir(scmRatisDir);
+  }
+
+  /**
+   * Get a collection of all scmNodeIds for the given scmServiceId.
+   */
+  public static Collection<String> getSCMNodeIds(Configuration conf,
+                                                 String scmServiceId) {
+    String key = addSuffix(ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
+    return conf.getTrimmedStringCollection(key);
+  }
+
+  public static String  getLocalSCMNodeId(String scmServiceId) {
+    return addSuffix(ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
+  }
+
+  /**
+   * Add non empty and non null suffix to a key.
+   */
+  private static String addSuffix(String key, String suffix) {
+    if (suffix == null || suffix.isEmpty()) {
+      return key;
+    }
+    assert !suffix.startsWith(".") :
+        "suffix '" + suffix + "' should not already have '.' prepended.";
+    return key + "." + suffix;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
index 8d66187..2390cb3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.ha;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,6 +28,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
 
 /**
  * Construct SCM node details.
@@ -153,6 +153,18 @@ public final class SCMNodeDetails {
   public static SCMNodeDetails initStandAlone(
       OzoneConfiguration conf) throws IOException {
     String localSCMServiceId = conf.getTrimmed(OZONE_SCM_INTERNAL_SERVICE_ID);
+    if (localSCMServiceId == null) {
+      // There is no internal om service id is being set, fall back to ozone
+      // .om.service.ids.
+      LOG.info("{} is not defined, falling back to {} to find serviceID for "
+              + "SCM if it is HA enabled cluster",
+          OZONE_SCM_INTERNAL_SERVICE_ID, OZONE_SCM_SERVICE_IDS_KEY);
+      localSCMServiceId = conf.getTrimmed(
+          OZONE_SCM_SERVICE_IDS_KEY);
+    } else {
+      LOG.info("ServiceID for SCM is {}", localSCMServiceId);
+    }
+    String localSCMNodeId = SCMHAUtils.getLocalSCMNodeId(localSCMServiceId);
     int ratisPort = conf.getInt(
         ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
         ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
@@ -161,8 +173,8 @@ public final class SCMNodeDetails {
     SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
         .setRatisPort(ratisPort)
         .setRpcAddress(rpcAddress)
-        .setSCMNodeId(localSCMServiceId)
-        .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
+        .setSCMNodeId(localSCMNodeId)
+        .setSCMServiceId(localSCMServiceId)
         .build();
     return scmNodeDetails;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
deleted file mode 100644
index 502260a..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.ratis;
-
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
-
-/**
- * Class for SCM StateMachine.
- */
-public class SCMStateMachine extends BaseStateMachine {
-  //TODO to be implemented
-  public SCMStateMachine(SCMRatisServer ratisServer) {
-
-  }
-
-  public void stop() {
-    return;
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 6f978bd..b8527bc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -25,6 +25,13 @@ import javax.management.ObjectName;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.protobuf.BlockingService;
+
+import java.io.File;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.Collection;
@@ -45,7 +52,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
-import org.apache.hadoop.hdds.scm.ratis.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisSnapshotInfo;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -114,13 +122,9 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.JvmPauseMonitor;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.protobuf.BlockingService;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
 import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -195,6 +199,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
   // SCM HA related
   private SCMRatisServer scmRatisServer;
+  private SCMRatisSnapshotInfo scmRatisSnapshotInfo;
+  private File scmRatisSnapshotDir;
 
   private JvmPauseMonitor jvmPauseMonitor;
   private final OzoneConfiguration configuration;
@@ -264,6 +270,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     }
 
     if (SCMHAUtils.isSCMHAEnabled(conf)) {
+      this.scmRatisSnapshotInfo = new SCMRatisSnapshotInfo(
+          scmStorageConfig.getCurrentDir());
+      this.scmRatisSnapshotDir = SCMHAUtils.createSCMRatisDir(conf);
       initializeRatisServer();
     } else {
       scmRatisServer = null;
@@ -796,6 +805,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
           getClientRpcAddress()));
     }
 
+    if (scmRatisServer != null) {
+      scmRatisServer.start();
+    }
+
     ms = HddsServerUtil
         .initializeMetrics(configuration, "StorageContainerManager");
 
@@ -1137,4 +1150,38 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       }
     }
   }
+
+  @VisibleForTesting
+  public SCMRatisServer getScmRatisServer() {
+    return scmRatisServer;
+  }
+
+  @VisibleForTesting
+  public SCMRatisSnapshotInfo getSnapshotInfo() {
+    return scmRatisSnapshotInfo;
+  }
+
+  @VisibleForTesting
+  public long getRatisSnapshotIndex() {
+    return scmRatisSnapshotInfo.getIndex();
+  }
+
+  /**
+   * Save ratis snapshot to SCM meta store and local disk.
+   */
+  public TermIndex saveRatisSnapshot() throws IOException {
+    TermIndex snapshotIndex = scmRatisServer.getLastAppliedTermIndex();
+    if (scmMetadataStore != null) {
+      // Flush the SCM state to disk
+      scmMetadataStore.getStore().flush();
+    }
+
+    if (containerManager != null) {
+      containerManager.flushDB();
+    }
+
+    scmRatisSnapshotInfo.saveRatisSnapshotToDisk(snapshotIndex);
+
+    return snapshotIndex;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
similarity index 97%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
index af1e5c2..77dee6a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
@@ -16,7 +16,7 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.hdds.scm.ratis;
+package org.apache.hadoop.hdds.scm.server.ratis;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
@@ -42,6 +42,7 @@ import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.SizeInBytes;
@@ -94,6 +95,10 @@ public final class SCMRatisServer {
     return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
   }
 
+  /**
+   * Creates a SCM Ratis Server.
+   * @throws IOException
+   */
   private SCMRatisServer(Configuration conf,
                          StorageContainerManager scm,
                          String raftGroupIdStr, RaftPeerId localRaftPeerId,
@@ -139,6 +144,9 @@ public final class SCMRatisServer {
     }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
   }
 
+  /**
+   * Create a SCM Ratis Server instance.
+   */
   public static SCMRatisServer newSCMRatisServer(
       Configuration conf, StorageContainerManager scm,
       SCMNodeDetails scmNodeDetails, List<SCMNodeDetails> peers)
@@ -178,7 +186,7 @@ public final class SCMRatisServer {
     return new SCMStateMachine(this);
   }
 
-  private RaftProperties newRaftProperties(Configuration conf){
+  private RaftProperties newRaftProperties(Configuration conf) {
     final RaftProperties properties = new RaftProperties();
     // Set RPC type
     final String rpcType = conf.get(
@@ -403,6 +411,15 @@ public final class SCMRatisServer {
     }
   }
 
+  public StorageContainerManager getSCM() {
+    return scm;
+  }
+
+  @VisibleForTesting
+  public SCMStateMachine getScmStateMachine() {
+    return scmStateMachine;
+  }
+
   public int getServerPort() {
     return port;
   }
@@ -441,6 +458,10 @@ public final class SCMRatisServer {
     }
   }
 
+  public TermIndex getLastAppliedTermIndex() {
+    return scmStateMachine.getLastAppliedTermIndex();
+  }
+
   private GroupInfoReply getGroupInfo() throws IOException {
     GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
         raftPeerId, raftGroupId, nextCallId());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisSnapshotInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisSnapshotInfo.java
new file mode 100644
index 0000000..11b3234
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisSnapshotInfo.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.ratis;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_RATIS_SNAPSHOT_INDEX;
+
+/**
+ * This class captures the snapshotIndex and term of the latest snapshot in
+ * the SCM.
+ * Ratis server loads the snapshotInfo during startup and updates the
+ * lastApplied index to this snapshotIndex. OM SnapshotInfo does not contain
+ * any files. It is used only to store/ update the last applied index and term.
+ */
+public class SCMRatisSnapshotInfo implements SnapshotInfo {
+
+  static final Logger LOG = LoggerFactory.getLogger(SCMRatisSnapshotInfo.class);
+
+  private volatile long term = 0;
+  private volatile long snapshotIndex = -1;
+
+  private final File ratisSnapshotFile;
+
+  public SCMRatisSnapshotInfo(File ratisDir) throws IOException {
+    ratisSnapshotFile = new File(ratisDir, SCM_RATIS_SNAPSHOT_INDEX);
+    loadRatisSnapshotIndex();
+  }
+
+  public void updateTerm(long newTerm) {
+    term = newTerm;
+  }
+
+  private void updateTermIndex(long newTerm, long newIndex) {
+    this.term = newTerm;
+    this.snapshotIndex = newIndex;
+  }
+
+  /**
+   * Load the snapshot index and term from the snapshot file on disk,
+   * if it exists.
+   * @throws IOException
+   */
+  private void loadRatisSnapshotIndex() throws IOException {
+    if (ratisSnapshotFile.exists()) {
+      RatisSnapshotYaml ratisSnapshotYaml = readRatisSnapshotYaml();
+      updateTermIndex(ratisSnapshotYaml.term, ratisSnapshotYaml.snapshotIndex);
+    }
+  }
+
+  /**
+   * Read and parse the snapshot yaml file.
+   */
+  private RatisSnapshotYaml readRatisSnapshotYaml() throws IOException {
+    try (FileInputStream inputFileStream = new FileInputStream(
+        ratisSnapshotFile)) {
+      Yaml yaml = new Yaml();
+      try {
+        return yaml.loadAs(inputFileStream, RatisSnapshotYaml.class);
+      } catch (Exception e) {
+        throw new IOException("Unable to parse RatisSnapshot yaml file.", e);
+      }
+    }
+  }
+
+  /**
+   * Update and persist the snapshot index and term to disk.
+   * @param lastAppliedTermIndex new snapshot index to be persisted to disk.
+   * @throws IOException
+   */
+  public void saveRatisSnapshotToDisk(TermIndex lastAppliedTermIndex)
+      throws IOException {
+    updateTermIndex(lastAppliedTermIndex.getTerm(),
+        lastAppliedTermIndex.getIndex());
+    writeRatisSnapshotYaml();
+    LOG.info("Saved Ratis Snapshot on the SCM with snapshotIndex {}",
+        lastAppliedTermIndex);
+  }
+
+  /**
+   * Write snapshot details to disk in yaml format.
+   */
+  private void writeRatisSnapshotYaml() throws IOException {
+    DumperOptions options = new DumperOptions();
+    options.setPrettyFlow(true);
+    options.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW);
+    Yaml yaml = new Yaml(options);
+
+    RatisSnapshotYaml ratisSnapshotYaml = new RatisSnapshotYaml(term,
+        snapshotIndex);
+
+    try (Writer writer = new OutputStreamWriter(
+        new FileOutputStream(ratisSnapshotFile), "UTF-8")) {
+      yaml.dump(ratisSnapshotYaml, writer);
+    }
+  }
+
+  @Override
+  public TermIndex getTermIndex() {
+    return TermIndex.newTermIndex(term, snapshotIndex);
+  }
+
+  @Override
+  public long getTerm() {
+    return term;
+  }
+
+  @Override
+  public long getIndex() {
+    return snapshotIndex;
+  }
+
+  @Override
+  public List<FileInfo> getFiles() {
+    return null;
+  }
+
+  /**
+   * Ratis Snapshot details to be written to the yaml file.
+   */
+  public static class RatisSnapshotYaml {
+    private long term;
+    private long snapshotIndex;
+
+    public RatisSnapshotYaml() {
+      // Needed for snake-yaml introspection.
+    }
+
+    RatisSnapshotYaml(long term, long snapshotIndex) {
+      this.term = term;
+      this.snapshotIndex = snapshotIndex;
+    }
+
+    public void setTerm(long term) {
+      this.term = term;
+    }
+
+    public long getTerm() {
+      return this.term;
+    }
+
+    public void setSnapshotIndex(long index) {
+      this.snapshotIndex = index;
+    }
+
+    public long getSnapshotIndex() {
+      return this.snapshotIndex;
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
new file mode 100644
index 0000000..b60570b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.ratis;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.util.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for SCM StateMachine.
+ */
+public class SCMStateMachine extends BaseStateMachine {
+  static final Logger LOG =
+      LoggerFactory.getLogger(SCMStateMachine.class);
+  private final SimpleStateMachineStorage storage =
+      new SimpleStateMachineStorage();
+  private final SCMRatisServer scmRatisServer;
+  private final StorageContainerManager scm;
+  private RaftGroupId raftGroupId;
+  private final SCMRatisSnapshotInfo snapshotInfo;
+  private final ExecutorService executorService;
+  private final ExecutorService installSnapshotExecutor;
+
+  // Map which contains index and term for the ratis transactions which are
+  // stateMachine entries which are recived through applyTransaction.
+  private ConcurrentMap<Long, Long> applyTransactionMap =
+      new ConcurrentSkipListMap<>();
+
+  /**
+   * Create a SCM state machine.
+   */
+  public SCMStateMachine(SCMRatisServer ratisServer) {
+    this.scmRatisServer = ratisServer;
+    this.scm = ratisServer.getSCM();
+
+    this.snapshotInfo = scm.getSnapshotInfo();
+    updateLastAppliedIndexWithSnaphsotIndex();
+
+    ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("SCM StateMachine ApplyTransaction Thread - %d").build();
+    this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
+    this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
+  }
+
+  /**
+   * Initializes the State Machine with the given server, group and storage.
+   */
+  @Override
+  public void initialize(RaftServer server, RaftGroupId id,
+                         RaftStorage raftStorage) throws IOException {
+    lifeCycle.startAndTransition(() -> {
+      super.initialize(server, id, raftStorage);
+      this.raftGroupId = id;
+      storage.init(raftStorage);
+    });
+  }
+
+  /**
+   * Pre-execute the update request into state machine.
+   */
+  @Override
+  public TransactionContext startTransaction(
+      RaftClientRequest raftClientRequest) {
+    return TransactionContext.newBuilder()
+        .setClientRequest(raftClientRequest)
+        .setStateMachine(this)
+        .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+        .setLogData(raftClientRequest.getMessage().getContent())
+        .build();
+  }
+
+  /**
+   * Apply a committed log entry to state machine.
+   */
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    CompletableFuture<Message> ratisFuture =
+        new CompletableFuture<>();
+    //TODO execute SCMRequest and process SCMResponse
+    return ratisFuture;
+  }
+
+  /**
+   * Query state machine.
+   */
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    //TODO make handler respond to the query request.
+    return CompletableFuture.completedFuture(request);
+  }
+
+  /**
+   * Pause state machine.
+   */
+  @Override
+  public void pause() {
+    lifeCycle.transition(LifeCycle.State.PAUSING);
+    lifeCycle.transition(LifeCycle.State.PAUSED);
+  }
+
+  /**
+   * Unpause state machine and update the lastAppliedIndex.
+   * Following after uploading new state to state machine.
+   */
+  public void unpause(long newLastAppliedSnaphsotIndex,
+                      long newLastAppliedSnapShotTermIndex) {
+    lifeCycle.startAndTransition(() -> {
+      this.setLastAppliedTermIndex(TermIndex.newTermIndex(
+          newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
+    });
+  }
+
+  /**
+   * Take SCM snapshot and write index to file.
+   * @return actual index or 0 if error.
+   */
+  @Override
+  public long takeSnapshot() throws IOException {
+    LOG.info("Saving Ratis snapshot on the SCM.");
+    if (scm != null) {
+      return scm.saveRatisSnapshot().getIndex();
+    }
+    return 0;
+  }
+
+  /**
+   * Get latest SCM snapshot.
+   */
+  @Override
+  public SnapshotInfo getLatestSnapshot() {
+    return snapshotInfo;
+  }
+
+  private synchronized void updateLastApplied() {
+    Long appliedTerm = null;
+    long appliedIndex = -1;
+    for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
+      final Long removed = applyTransactionMap.remove(i);
+      if (removed == null) {
+        break;
+      }
+      appliedTerm = removed;
+      appliedIndex = i;
+    }
+    if (appliedTerm != null) {
+      updateLastAppliedTermIndex(appliedTerm, appliedIndex);
+    }
+  }
+
+  /**
+   * Called to notify state machine about indexes which are processed
+   * internally by Raft Server, this currently happens when conf entries are
+   * processed in raft Server. This keep state machine to keep a track of index
+   * updates.
+   */
+  @Override
+  public void notifyIndexUpdate(long currentTerm, long index) {
+    applyTransactionMap.put(index, currentTerm);
+    updateLastApplied();
+    snapshotInfo.updateTerm(currentTerm);
+  }
+
+  /**
+   * Notifies the state machine that the raft peer is no longer leader.
+   */
+  @Override
+  public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
+    scmRatisServer.updateServerRole();
+  }
+
+  /**
+   * Transfer from log entry to string.
+   */
+  @Override
+  public String toStateMachineLogEntryString(
+      RaftProtos.StateMachineLogEntryProto proto) {
+    //TODO implement transfer from proto to SCMRequest body.
+    return null;
+  }
+
+  /**
+   * Update lastAppliedIndex term in snapshot info.
+   */
+  public void updateLastAppliedIndexWithSnaphsotIndex() {
+    setLastAppliedTermIndex(TermIndex.newTermIndex(snapshotInfo.getTerm(),
+        snapshotInfo.getIndex()));
+    LOG.info("LastAppliedIndex set from SnapShotInfo {}",
+        getLastAppliedTermIndex());
+  }
+
+  @VisibleForTesting
+  void addApplyTransactionTermIndex(long term, long index) {
+    applyTransactionMap.put(index, term);
+  }
+
+  public void stop() {
+    HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+    HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/package-info.java
similarity index 94%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/package-info.java
index 4944017..77f4afa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/package-info.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdds.scm.ratis;
+package org.apache.hadoop.hdds.scm.server.ratis;
 
 /**
  * This package contains classes related to Apache Ratis for SCM.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMRatisServer.java
similarity index 86%
rename from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMRatisServer.java
index f29fb5f..d6981d3 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMRatisServer.java
@@ -16,7 +16,7 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.hdds.scm.ratis;
+package org.apache.hadoop.hdds.scm.server.ratis;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.util.LifeCycle;
@@ -59,7 +58,6 @@ public class TestSCMRatisServer {
   private SCMRatisServer scmRatisServer;
   private StorageContainerManager scm;
   private String scmId;
-  private SCMNodeDetails scmNodeDetails;
   private static final long LEADER_ELECTION_TIMEOUT = 500L;
 
   @Before
@@ -69,25 +67,14 @@ public class TestSCMRatisServer {
     conf.setTimeDuration(
         ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
         LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
-    int ratisPort = conf.getInt(
-        ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
-        ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
-    InetSocketAddress rpcAddress = new InetSocketAddress(
-        InetAddress.getLocalHost(), 0);
-    scmNodeDetails = new SCMNodeDetails.Builder()
-        .setRatisPort(ratisPort)
-        .setRpcAddress(rpcAddress)
-        .setSCMNodeId(scmId)
-        .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
-        .build();
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+    conf.set(ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID, "scm-ha-test");
 
     // Standalone SCM Ratis server
     initSCM();
     scm = HddsTestUtils.getScm(conf);
     scm.start();
-    scmRatisServer = SCMRatisServer.newSCMRatisServer(
-        conf, scm, scmNodeDetails, Collections.EMPTY_LIST);
-    scmRatisServer.start();
+    scmRatisServer = scm.getScmRatisServer();
   }
 
   @After
@@ -101,7 +88,7 @@ public class TestSCMRatisServer {
   }
 
   @Test
-  public void testStartSCMRatisServer() throws Exception {
+  public void testStartSCMRatisServer() {
     Assert.assertEquals("Ratis Server should be in running state",
         LifeCycle.State.RUNNING, scmRatisServer.getServerState());
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMStateMachine.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMStateMachine.java
new file mode 100644
index 0000000..69bc5bd
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMStateMachine.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.server.ratis;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * Test class for SCMStateMachine.
+ */
+public class TestSCMStateMachine {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private SCMStateMachine scmStateMachine;
+  private StorageContainerManager scm;
+  private SCMRatisServer scmRatisServer;
+  private OzoneConfiguration conf;
+  private String scmId;
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+    conf.set(ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID, "scm-ha-test");
+    scmId = UUID.randomUUID().toString();
+
+    initSCM();
+    scm = HddsTestUtils.getScm(conf);
+    scm.start();
+    scmRatisServer = scm.getScmRatisServer();
+    scmStateMachine = scm.getScmRatisServer().getScmStateMachine();
+  }
+
+  @Test
+  public void testSCMUpdatedAppliedIndex(){
+    // State machine should start with 0 term and 0 index.
+    scmStateMachine.notifyIndexUpdate(0, 0);
+    Assert.assertEquals(0,
+        scmStateMachine.getLastAppliedTermIndex().getTerm());
+    Assert.assertEquals(0,
+        scmStateMachine.getLastAppliedTermIndex().getIndex());
+
+    // If only the transactionMap is updated, index should stay 0.
+    scmStateMachine.addApplyTransactionTermIndex(0, 1);
+    Assert.assertEquals(0L,
+        scmStateMachine.getLastAppliedTermIndex().getTerm());
+    Assert.assertEquals(0L,
+        scmStateMachine.getLastAppliedTermIndex().getIndex());
+
+    // After the index update is notified, the index should increase.
+    scmStateMachine.notifyIndexUpdate(0, 1);
+    Assert.assertEquals(0L,
+        scmStateMachine.getLastAppliedTermIndex().getTerm());
+    Assert.assertEquals(1L,
+        scmStateMachine.getLastAppliedTermIndex().getIndex());
+
+    // Only do a notifyIndexUpdate can also increase the index.
+    scmStateMachine.notifyIndexUpdate(0, 2);
+    Assert.assertEquals(0L,
+        scmStateMachine.getLastAppliedTermIndex().getTerm());
+    Assert.assertEquals(2L,
+        scmStateMachine.getLastAppliedTermIndex().getIndex());
+
+    // If a larger index is notified, the index should not be updated.
+    scmStateMachine.notifyIndexUpdate(0, 5);
+    Assert.assertEquals(0L,
+        scmStateMachine.getLastAppliedTermIndex().getTerm());
+    Assert.assertEquals(2L,
+        scmStateMachine.getLastAppliedTermIndex().getIndex());
+  }
+
+  private void initSCM() throws IOException {
+    String clusterId = UUID.randomUUID().toString();
+    final String path = folder.newFolder().toString();
+    Path scmPath = Paths.get(path, "scm-meta");
+    Files.createDirectories(scmPath);
+    conf.set(OZONE_METADATA_DIRS, scmPath.toString());
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
+    scmStore.setClusterId(clusterId);
+    scmStore.setScmId(scmId);
+    // writes the version file properties
+    scmStore.initialize();
+  }
+
+  @After
+  public void cleanup() {
+    scm.stop();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org