You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/03/31 12:16:28 UTC
[hadoop-ozone] 01/01: HDDS-3185 Construct a standalone ratis server
for SCM. (#720)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit d156eae915329858f4e7b4e1686a8d86aeefa641
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Fri Mar 27 17:22:40 2020 +0800
HDDS-3185 Construct a standalone ratis server for SCM. (#720)
Contributed-by: Li Cheng <ti...@tencent.com>
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 87 ++++
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 3 +-
.../common/src/main/resources/ozone-default.xml | 180 ++++++++
.../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 37 ++
.../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java | 169 ++++++++
.../apache/hadoop/hdds/scm/ha/package-info.java | 22 +
.../hadoop/hdds/scm/ratis/SCMRatisServer.java | 461 +++++++++++++++++++++
.../hadoop/hdds/scm/ratis/SCMStateMachine.java | 35 ++
.../hdds/scm/server/StorageContainerManager.java | 27 ++
.../hadoop/hdds/scm/ratis/TestSCMRatisServer.java | 158 +++++++
.../hadoop/ozone/TestOzoneConfigurationFields.java | 2 +
11 files changed, 1180 insertions(+), 1 deletion(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 4eda6f8..6f060e2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -274,6 +274,16 @@ public final class ScmConfigKeys {
// able to send back a new list to the datanodes.
public static final String OZONE_SCM_NAMES = "ozone.scm.names";
+ public static final String OZONE_SCM_INTERNAL_SERVICE_ID =
+ "ozone.scm.internal.service.id";
+
+ public static final String OZONE_SCM_SERVICE_IDS_KEY =
+ "ozone.scm.service.ids";
+ public static final String OZONE_SCM_NODES_KEY =
+ "ozone.scm.nodes";
+ public static final String OZONE_SCM_NODE_ID_KEY =
+ "ozone.scm.node.id";
+
public static final int OZONE_SCM_DEFAULT_PORT =
OZONE_SCM_DATANODE_PORT_DEFAULT;
// The path where datanode ID is to be written to.
@@ -363,6 +373,83 @@ public final class ScmConfigKeys {
public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled";
public static final boolean HDDS_TRACING_ENABLED_DEFAULT = true;
+ // SCM Ratis related
+ public static final String OZONE_SCM_HA_ENABLE_KEY
+ = "ozone.scm.ratis.enable";
+ public static final boolean OZONE_SCM_HA_ENABLE_DEFAULT
+ = false;
+ public static final String OZONE_SCM_RATIS_PORT_KEY
+ = "ozone.scm.ratis.port";
+ public static final int OZONE_SCM_RATIS_PORT_DEFAULT
+ = 9864;
+ public static final String OZONE_SCM_RATIS_RPC_TYPE_KEY
+ = "ozone.scm.ratis.rpc.type";
+ public static final String OZONE_SCM_RATIS_RPC_TYPE_DEFAULT
+ = "GRPC";
+
+ // SCM Ratis Log configurations
+ public static final String OZONE_SCM_RATIS_STORAGE_DIR
+ = "ozone.scm.ratis.storage.dir";
+ public static final String OZONE_SCM_RATIS_SEGMENT_SIZE_KEY
+ = "ozone.scm.ratis.segment.size";
+ public static final String OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT
+ = "16KB";
+ public static final String OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY
+ = "ozone.scm.ratis.segment.preallocated.size";
+ public static final String OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT
+ = "16KB";
+
+ // SCM Ratis Log Appender configurations
+ public static final String
+ OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS =
+ "ozone.scm.ratis.log.appender.queue.num-elements";
+ public static final int
+ OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT = 1024;
+ public static final String OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT =
+ "ozone.scm.ratis.log.appender.queue.byte-limit";
+ public static final String
+ OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
+ public static final String OZONE_SCM_RATIS_LOG_PURGE_GAP =
+ "ozone.scm.ratis.log.purge.gap";
+ public static final int OZONE_SCM_RATIS_LOG_PURGE_GAP_DEFAULT = 1000000;
+
+ // SCM Ratis server configurations
+ public static final String OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
+ = "ozone.scm.ratis.server.request.timeout";
+ public static final TimeDuration
+ OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
+ = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+ public static final String
+ OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY
+ = "ozone.scm.ratis.server.retry.cache.timeout";
+ public static final TimeDuration
+ OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT
+ = TimeDuration.valueOf(600000, TimeUnit.MILLISECONDS);
+ public static final String OZONE_SCM_RATIS_MINIMUM_TIMEOUT_KEY
+ = "ozone.scm.ratis.minimum.timeout";
+ public static final TimeDuration OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT
+ = TimeDuration.valueOf(1, TimeUnit.SECONDS);
+
+ // SCM Ratis Leader Election configurations
+ public static final String
+ OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY =
+ "ozone.scm.leader.election.minimum.timeout.duration";
+ public static final TimeDuration
+ OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
+ TimeDuration.valueOf(1, TimeUnit.SECONDS);
+ public static final String OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY
+ = "ozone.scm.ratis.server.failure.timeout.duration";
+ public static final TimeDuration
+ OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+ = TimeDuration.valueOf(120, TimeUnit.SECONDS);
+
+ // SCM Leader server role check interval
+ public static final String OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY
+ = "ozone.scm.ratis.server.role.check.interval";
+ public static final TimeDuration
+ OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+ = TimeDuration.valueOf(15, TimeUnit.SECONDS);
+
/**
* Never constructed.
*/
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index c6589d4..d9c00e0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -329,5 +329,6 @@ public final class OzoneConsts {
public static final String GDPR_SECRET = "secret";
public static final String GDPR_ALGORITHM = "algorithm";
-
+ // SCM HA
+ public static final String SCM_SERVICE_ID_DEFAULT = "scmServiceIdDefault";
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 5972cf0..90025e2 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1881,6 +1881,186 @@
<tag>OZONE, HDDS, SECURITY</tag>
<description>SCM security server port.</description>
</property>
+ <property>
+ <name>ozone.scm.service.ids</name>
+ <value></value>
+ <tag>OZONE, SCM, HA</tag>
+ <description>
+ Comma-separated list of SCM service Ids. This property allows the client
+ to figure out quorum of OzoneManager address.
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.internal.service.id</name>
+ <value></value>
+ <tag>OZONE, SCM, HA</tag>
+ <description>
+ Service ID of the SCM. If this is not set fall back to
+ ozone.scm.service.ids to find the service ID it belongs to.
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.nodes.EXAMPLESCMSERVICEID</name>
+ <value></value>
+ <tag>OZONE, SCM, HA</tag>
+ <description>
+ Comma-separated list of SCM node Ids for a given SCM service ID (eg.
+ EXAMPLESCMSERVICEID). The SCM service ID should be the value (one of the
+ values if there are multiple) set for the parameter ozone.scm.service.ids.
+
+ Unique identifiers for each SCM Node, delimited by commas. This will be
+ used by SCMs in HA setup to determine all the SCMs
+ belonging to the same SCM in the cluster. For example, if you
+ used “scmService1” as the SCM service ID previously, and you wanted to
+ use “scm1”, “scm2” and "scm3" as the individual IDs of the SCMs,
+ you would configure a property ozone.scm.nodes.scmService1, and its value
+ "scm1,scm2,scm3".
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.node.id</name>
+ <value></value>
+ <tag>OZONE, SCM, HA</tag>
+ <description>
+ The ID of this SCM node. If the SCM node ID is not configured it
+ is determined automatically by matching the local node's address
+ with the configured address.
+
+ If node ID is not deterministic from the configuration, then it is set
+ to the scmId from the SCM version file.
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.ratis.enable</name>
+ <value>false</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>Property to enable or disable Ratis server on SCM.
+ Please note - this is a temporary property to disable SCM Ratis server.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.port</name>
+ <value>9872</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>
+ The port number of the SCM's Ratis server.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.rpc.type</name>
+ <value>GRPC</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>Ratis supports different kinds of transports like netty, GRPC,
+ Hadoop RPC etc. This picks one of those for this cluster.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.storage.dir</name>
+ <value/>
+ <tag>OZONE, SCM, HA, RATIS, STORAGE</tag>
+ <description>This directory is used for storing SCM's Ratis metadata like
+ logs. If this is not set then default metadata dirs is used. A warning
+ will be logged if this not set. Ideally, this should be mapped to a
+ fast disk like an SSD.
+ If undefined, SCM ratis storage dir will fallback to ozone.metadata.dirs.
+ This fallback approach is not recommended for production environments.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.segment.size</name>
+ <value>16KB</value>
+ <tag>OZONE, SCM, HA, RATIS, PERFORMANCE</tag>
+ <description>The size of the raft segment used by Apache Ratis on SCM.
+ (16 KB by default)
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.segment.preallocated.size</name>
+ <value>16KB</value>
+ <tag>OZONE, SCM, HA, RATIS, PERFORMANCE</tag>
+ <description>The size of the buffer which is preallocated for raft segment
+ used by Apache Ratis on SCM.(16 KB by default)
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.log.appender.queue.num-elements</name>
+ <value>1024</value>
+ <tag>OZONE, DEBUG, SCM, HA, RATIS</tag>
+ <description>Number of operation pending with Raft's Log Worker.
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.ratis.log.appender.queue.byte-limit</name>
+ <value>32MB</value>
+ <tag>OZONE, DEBUG, SCM, HA, RATIS</tag>
+ <description>Byte limit for Raft's Log Worker queue.
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.ratis.log.purge.gap</name>
+ <value>1000000</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>The minimum gap between log indices for Raft server to purge
+ its log segments after taking snapshot.
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.ratis.server.request.timeout</name>
+ <value>3s</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>The timeout duration for SCM's ratis server request .</description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.server.retry.cache.timeout</name>
+ <value>600000ms</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>Retry Cache entry timeout for SCM's ratis server.</description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.minimum.timeout</name>
+ <value>1s</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>The minimum timeout duration for SCM's Ratis server rpc.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.scm.leader.election.minimum.timeout.duration</name>
+ <value>1s</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>The minimum timeout duration for SCM ratis leader election.
+ Default is 1s.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.server.failure.timeout.duration</name>
+ <value>120s</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>The timeout duration for ratis server failure detection,
+ once the threshold has reached, the ratis state machine will be informed
+ about the failure in the ratis ring.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.scm.ratis.server.role.check.interval</name>
+ <value>15s</value>
+ <tag>OZONE, SCM, HA, RATIS</tag>
+ <description>The interval between SCM leader performing a role
+ check on its ratis server. Ratis server informs SCM if it
+ loses the leader role. The scheduled check is an secondary
+ check to ensure that the leader role is updated periodically
+ .</description>
+ </property>
<property>
<name>hdds.metadata.dir</name>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
new file mode 100644
index 0000000..c0364ad
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+
+/**
+ * Utility class used by SCM HA.
+ */
+public final class SCMHAUtils {
+ private SCMHAUtils() {
+ // not used
+ }
+
+ // Check if SCM HA is enabled.
+ public static boolean isSCMHAEnabled(OzoneConfiguration conf) {
+ return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY,
+ ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
new file mode 100644
index 0000000..8d66187
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID;
+
+/**
+ * Construct SCM node details.
+ */
+public final class SCMNodeDetails {
+ private String scmServiceId;
+ private String scmNodeId;
+ private InetSocketAddress rpcAddress;
+ private int rpcPort;
+ private int ratisPort;
+ private String httpAddress;
+ private String httpsAddress;
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SCMNodeDetails.class);
+
+ /**
+ * Constructs SCMNodeDetails object.
+ */
+ private SCMNodeDetails(String serviceId, String nodeId,
+ InetSocketAddress rpcAddr, int rpcPort, int ratisPort,
+ String httpAddress, String httpsAddress) {
+ this.scmServiceId = serviceId;
+ this.scmNodeId = nodeId;
+ this.rpcAddress = rpcAddr;
+ this.rpcPort = rpcPort;
+ this.ratisPort = ratisPort;
+ this.httpAddress = httpAddress;
+ this.httpsAddress = httpsAddress;
+ }
+
+ @Override
+ public String toString() {
+ return "SCMNodeDetails["
+ + "scmServiceId=" + scmServiceId +
+ ", scmNodeId=" + scmNodeId +
+ ", rpcAddress=" + rpcAddress +
+ ", rpcPort=" + rpcPort +
+ ", ratisPort=" + ratisPort +
+ ", httpAddress=" + httpAddress +
+ ", httpsAddress=" + httpsAddress +
+ "]";
+ }
+
+ /**
+ * Builder class for SCMNodeDetails.
+ */
+ public static class Builder {
+ private String scmServiceId;
+ private String scmNodeId;
+ private InetSocketAddress rpcAddress;
+ private int rpcPort;
+ private int ratisPort;
+ private String httpAddr;
+ private String httpsAddr;
+
+ public Builder setRpcAddress(InetSocketAddress rpcAddr) {
+ this.rpcAddress = rpcAddr;
+ this.rpcPort = rpcAddress.getPort();
+ return this;
+ }
+
+ public Builder setRatisPort(int port) {
+ this.ratisPort = port;
+ return this;
+ }
+
+ public Builder setSCMServiceId(String serviceId) {
+ this.scmServiceId = serviceId;
+ return this;
+ }
+
+ public Builder setSCMNodeId(String nodeId) {
+ this.scmNodeId = nodeId;
+ return this;
+ }
+
+ public Builder setHttpAddress(String httpAddress) {
+ this.httpAddr = httpAddress;
+ return this;
+ }
+
+ public Builder setHttpsAddress(String httpsAddress) {
+ this.httpsAddr = httpsAddress;
+ return this;
+ }
+
+ public SCMNodeDetails build() {
+ return new SCMNodeDetails(scmServiceId, scmNodeId, rpcAddress, rpcPort,
+ ratisPort, httpAddr, httpsAddr);
+ }
+ }
+
+ public String getSCMServiceId() {
+ return scmServiceId;
+ }
+
+ public String getSCMNodeId() {
+ return scmNodeId;
+ }
+
+ public InetSocketAddress getRpcAddress() {
+ return rpcAddress;
+ }
+
+ public InetAddress getAddress() {
+ return rpcAddress.getAddress();
+ }
+
+ public int getRatisPort() {
+ return ratisPort;
+ }
+
+ public int getRpcPort() {
+ return rpcPort;
+ }
+
+ public String getRpcAddressString() {
+ return NetUtils.getHostPortString(rpcAddress);
+ }
+
+ public static SCMNodeDetails initStandAlone(
+ OzoneConfiguration conf) throws IOException {
+ String localSCMServiceId = conf.getTrimmed(OZONE_SCM_INTERNAL_SERVICE_ID);
+ int ratisPort = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
+ InetSocketAddress rpcAddress = new InetSocketAddress(
+ InetAddress.getLocalHost(), 0);
+ SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
+ .setRatisPort(ratisPort)
+ .setRpcAddress(rpcAddress)
+ .setSCMNodeId(localSCMServiceId)
+ .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
+ .build();
+ return scmNodeDetails;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java
new file mode 100644
index 0000000..06fe168
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+/**
+ * This package contains classes related to SCM HA.
+ */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
new file mode 100644
index 0000000..af1e5c2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ratis;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Class for SCM Ratis Server.
+ */
+public final class SCMRatisServer {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SCMRatisServer.class);
+
+ private final StorageContainerManager scm;
+ private final SCMStateMachine scmStateMachine;
+
+ private final int port;
+ private final InetSocketAddress scmRatisAddress;
+ private final RaftServer server;
+ private final RaftGroupId raftGroupId;
+ private final RaftGroup raftGroup;
+ private final RaftPeerId raftPeerId;
+
+ private final ClientId clientId = ClientId.randomId();
+ private final ScheduledExecutorService scheduledRoleChecker;
+ private long roleCheckInitialDelayMs = 1000; // 1 second default
+ private long roleCheckIntervalMs;
+ private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
+ private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
+ private Optional<RaftPeerId> cachedLeaderPeerId = Optional.empty();
+
+ private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+ private static long nextCallId() {
+ return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+ }
+
+ private SCMRatisServer(Configuration conf,
+ StorageContainerManager scm,
+ String raftGroupIdStr, RaftPeerId localRaftPeerId,
+ InetSocketAddress addr, List<RaftPeer> raftPeers)
+ throws IOException {
+ this.scm = scm;
+ this.scmRatisAddress = addr;
+ this.port = addr.getPort();
+ RaftProperties serverProperties = newRaftProperties(conf);
+
+ this.raftPeerId = localRaftPeerId;
+ this.raftGroupId = RaftGroupId.valueOf(
+ getRaftGroupIdFromOmServiceId(raftGroupIdStr));
+ this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
+
+ StringBuilder raftPeersStr = new StringBuilder();
+ for (RaftPeer peer : raftPeers) {
+ raftPeersStr.append(", ").append(peer.getAddress());
+ }
+ LOG.info("Instantiating SCM Ratis server with GroupID: {} and " +
+ "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
+ this.scmStateMachine = getStateMachine();
+
+ this.server = RaftServer.newBuilder()
+ .setServerId(this.raftPeerId)
+ .setGroup(this.raftGroup)
+ .setProperties(serverProperties)
+ .setStateMachine(scmStateMachine)
+ .build();
+
+ // Run a scheduler to check and update the server role on the leader
+ // periodically
+ this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
+ this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ // Run this check only on the leader OM
+ if (cachedPeerRole.isPresent() &&
+ cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) {
+ updateServerRole();
+ }
+ }
+ }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ public static SCMRatisServer newSCMRatisServer(
+ Configuration conf, StorageContainerManager scm,
+ SCMNodeDetails scmNodeDetails, List<SCMNodeDetails> peers)
+ throws IOException {
+ String scmServiceId = scmNodeDetails.getSCMServiceId();
+
+ String scmNodeId = scmNodeDetails.getSCMNodeId();
+ RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(scmNodeId);
+ InetSocketAddress ratisAddr = new InetSocketAddress(
+ scmNodeDetails.getAddress(), scmNodeDetails.getRatisPort());
+
+ RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
+
+ List<RaftPeer> raftPeers = new ArrayList<>();
+ raftPeers.add(localRaftPeer);
+
+ for (SCMNodeDetails peer : peers) {
+ String peerNodeId = peer.getSCMNodeId();
+ InetSocketAddress peerRatisAddr = new InetSocketAddress(
+ peer.getAddress(), peer.getRatisPort());
+ RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
+ RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
+ // Add other SCMs in Ratis ring
+ raftPeers.add(raftPeer);
+ }
+
+ return new SCMRatisServer(conf, scm, scmServiceId, localRaftPeerId,
+ ratisAddr, raftPeers);
+ }
+
+ private UUID getRaftGroupIdFromOmServiceId(String scmServiceId) {
+ return UUID.nameUUIDFromBytes(scmServiceId.getBytes(
+ StandardCharsets.UTF_8));
+ }
+
+ private SCMStateMachine getStateMachine() {
+ return new SCMStateMachine(this);
+ }
+
+ private RaftProperties newRaftProperties(Configuration conf){
+ final RaftProperties properties = new RaftProperties();
+ // Set RPC type
+ final String rpcType = conf.get(
+ ScmConfigKeys.OZONE_SCM_RATIS_RPC_TYPE_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_RPC_TYPE_DEFAULT);
+ final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
+ RaftConfigKeys.Rpc.setType(properties, rpc);
+ // Set the ratis port number
+ if (rpc == SupportedRpcType.GRPC) {
+ GrpcConfigKeys.Server.setPort(properties, port);
+ } else if (rpc == SupportedRpcType.NETTY) {
+ NettyConfigKeys.Server.setPort(properties, port);
+ }
+ // Set Ratis storage directory
+ String storageDir = SCMRatisServer.getSCMRatisDirectory(conf);
+ RaftServerConfigKeys.setStorageDirs(properties,
+ Collections.singletonList(new File(storageDir)));
+ // Set RAFT segment size
+ final int raftSegmentSize = (int) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
+ SizeInBytes.valueOf(raftSegmentSize));
+ // Set RAFT segment pre-allocated size
+ final int raftSegmentPreallocatedSize = (int) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ int logAppenderQueueNumElements = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS,
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT);
+ final int logAppenderQueueByteLimit = (int) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
+ StorageUnit.BYTES);
+ RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
+ logAppenderQueueNumElements);
+ RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
+ SizeInBytes.valueOf(logAppenderQueueByteLimit));
+ RaftServerConfigKeys.Log.setPreallocatedSize(properties,
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
+ false);
+ final int logPurgeGap = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_PURGE_GAP,
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_PURGE_GAP_DEFAULT);
+ RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
+ // For grpc set the maximum message size
+ // TODO: calculate the optimal max message size
+ GrpcConfigKeys.setMessageSizeMax(properties,
+ SizeInBytes.valueOf(logAppenderQueueByteLimit));
+
+ // Set the server request timeout
+ TimeUnit serverRequestTimeoutUnit =
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit();
+ long serverRequestTimeoutDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
+ .getDuration(), serverRequestTimeoutUnit);
+ final TimeDuration serverRequestTimeout = TimeDuration.valueOf(
+ serverRequestTimeoutDuration, serverRequestTimeoutUnit);
+ RaftServerConfigKeys.Rpc.setRequestTimeout(properties,
+ serverRequestTimeout);
+ // Set timeout for server retry cache entry
+ TimeUnit retryCacheTimeoutUnit = ScmConfigKeys
+ .OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit();
+ long retryCacheTimeoutDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT
+ .getDuration(), retryCacheTimeoutUnit);
+ final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
+ retryCacheTimeoutDuration, retryCacheTimeoutUnit);
+ RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
+ retryCacheTimeout);
+ // Set the server min and max timeout
+ TimeUnit serverMinTimeoutUnit =
+ ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit();
+ long serverMinTimeoutDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT
+ .getDuration(), serverMinTimeoutUnit);
+ final TimeDuration serverMinTimeout = TimeDuration.valueOf(
+ serverMinTimeoutDuration, serverMinTimeoutUnit);
+ long serverMaxTimeoutDuration =
+ serverMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
+ final TimeDuration serverMaxTimeout = TimeDuration.valueOf(
+ serverMaxTimeoutDuration, serverMinTimeoutUnit);
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+ serverMinTimeout);
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+ serverMaxTimeout);
+ // Set the number of maximum cached segments
+ RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
+ // TODO: set max write buffer size
+ // Set the ratis leader election timeout
+ TimeUnit leaderElectionMinTimeoutUnit =
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+ .getUnit();
+ long leaderElectionMinTimeoutduration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+ .getDuration(), leaderElectionMinTimeoutUnit);
+ final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf(
+ leaderElectionMinTimeoutduration, leaderElectionMinTimeoutUnit);
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+ leaderElectionMinTimeout);
+ long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong(
+ TimeUnit.MILLISECONDS) + 200;
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+ TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
+ TimeUnit nodeFailureTimeoutUnit =
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+ .getUnit();
+ long nodeFailureTimeoutDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+ .getDuration(), nodeFailureTimeoutUnit);
+ final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
+ nodeFailureTimeoutDuration, nodeFailureTimeoutUnit);
+ RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
+ nodeFailureTimeout);
+ RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
+ nodeFailureTimeout);
+
+ // Ratis leader role check
+ TimeUnit roleCheckIntervalUnit =
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+ .getUnit();
+ long roleCheckIntervalDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+ .getDuration(), nodeFailureTimeoutUnit);
+ this.roleCheckIntervalMs = TimeDuration.valueOf(
+ roleCheckIntervalDuration, roleCheckIntervalUnit)
+ .toLong(TimeUnit.MILLISECONDS);
+ this.roleCheckInitialDelayMs = leaderElectionMinTimeout
+ .toLong(TimeUnit.MILLISECONDS);
+
+ return properties;
+ }
+
+ /**
+ * Start the Ratis server.
+ * @throws IOException
+ */
+ public void start() throws IOException {
+ LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
+ server.getId(), port);
+ server.start();
+ }
+
+ /**
+ * Stop the Ratis server.
+ */
+ public void stop() {
+ try {
+ server.close();
+ scmStateMachine.stop();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean checkCachedPeerRoleIsLeader() {
+ this.roleCheckLock.readLock().lock();
+ try {
+ if (cachedPeerRole.isPresent() &&
+ cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) {
+ return true;
+ }
+ return false;
+ } finally {
+ this.roleCheckLock.readLock().unlock();
+ }
+ }
+
+ public boolean isLeader() {
+ if (checkCachedPeerRoleIsLeader()) {
+ return true;
+ }
+
+ // Get the server role from ratis server and update the cached values.
+ updateServerRole();
+
+ // After updating the server role, check and return if leader or not.
+ return checkCachedPeerRoleIsLeader();
+ }
+
+ @VisibleForTesting
+ public LifeCycle.State getServerState() {
+ return server.getLifeCycleState();
+ }
+
+ @VisibleForTesting
+ public RaftPeerId getRaftPeerId() {
+ return this.raftPeerId;
+ }
+
+ public RaftGroup getRaftGroup() {
+ return this.raftGroup;
+ }
+
+ /**
+ * Get the local directory where ratis logs will be stored.
+ */
+ public static String getSCMRatisDirectory(Configuration conf) {
+ String storageDir = conf.get(ScmConfigKeys.OZONE_SCM_RATIS_STORAGE_DIR);
+
+ if (Strings.isNullOrEmpty(storageDir)) {
+ storageDir = ServerUtils.getDefaultRatisDirectory(conf);
+ }
+ return storageDir;
+ }
+
+ public Optional<RaftPeerId> getCachedLeaderPeerId() {
+ this.roleCheckLock.readLock().lock();
+ try {
+ return cachedLeaderPeerId;
+ } finally {
+ this.roleCheckLock.readLock().unlock();
+ }
+ }
+
+ public int getServerPort() {
+ return port;
+ }
+
+ public void updateServerRole() {
+ try {
+ GroupInfoReply groupInfo = getGroupInfo();
+ RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
+ RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();
+
+ if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
+ setServerRole(thisNodeRole, raftPeerId);
+
+ } else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
+ ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
+ .getLeaderInfo().getId().getId();
+ // There may be a chance, here we get leaderNodeId as null. For
+ // example, in 3 node OM Ratis, if 2 SCM nodes are down, there will
+ // be no leader.
+ RaftPeerId leaderPeerId = null;
+ if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
+ leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
+ }
+
+ setServerRole(thisNodeRole, leaderPeerId);
+
+ } else {
+ setServerRole(thisNodeRole, null);
+
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " +
+ "{} and resetting leader info.",
+ RaftProtos.RaftPeerRole.UNRECOGNIZED, e);
+ setServerRole(null, null);
+ }
+ }
+
+ private GroupInfoReply getGroupInfo() throws IOException {
+ GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
+ raftPeerId, raftGroupId, nextCallId());
+ GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest);
+ return groupInfo;
+ }
+
+ private void setServerRole(RaftProtos.RaftPeerRole currentRole,
+ RaftPeerId leaderPeerId) {
+ this.roleCheckLock.writeLock().lock();
+ try {
+ this.cachedPeerRole = Optional.ofNullable(currentRole);
+ this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
+ } finally {
+ this.roleCheckLock.writeLock().unlock();
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
new file mode 100644
index 0000000..502260a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ratis;
+
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+
+/**
+ * Class for SCM StateMachine.
+ */
+public class SCMStateMachine extends BaseStateMachine {
+ //TODO to be implemented
+ public SCMStateMachine(SCMRatisServer ratisServer) {
+
+ }
+
+ public void stop() {
+ return;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 9dcb8f2..884710c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -29,6 +29,7 @@ import com.google.protobuf.BlockingService;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
+import java.util.Collections;
import java.util.Objects;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,9 @@ import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.ratis.SCMRatisServer;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -190,6 +194,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private CertificateServer certificateServer;
private GrpcTlsConfig grpcTlsConfig;
+ // SCM HA related
+ private SCMRatisServer scmRatisServer;
+
private JvmPauseMonitor jvmPauseMonitor;
private final OzoneConfiguration configuration;
private final SafeModeHandler safeModeHandler;
@@ -258,6 +265,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
loginAsSCMUser(conf);
}
+ if (SCMHAUtils.isSCMHAEnabled(conf)) {
+ initializeRatisServer();
+ } else {
+ scmRatisServer = null;
+ }
+
// Creates the SCM DBs or opens them if it exists.
// A valid pointer to the store is required by all the other services below.
initalizeMetadataStore(conf, configurator);
@@ -1107,4 +1120,18 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
public NetworkTopology getClusterMap() {
return this.clusterMap;
}
+
+ private void initializeRatisServer() throws IOException {
+ if (scmRatisServer == null) {
+ SCMNodeDetails scmNodeDetails = SCMNodeDetails
+ .initStandAlone(configuration);
+ //TODO enable Ratis ring
+ scmRatisServer = SCMRatisServer.newSCMRatisServer(configuration, this,
+ scmNodeDetails, Collections.EMPTY_LIST);
+ if (scmRatisServer != null) {
+ LOG.info("SCM Ratis server initialized at port {}",
+ scmRatisServer.getServerPort());
+ }
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
new file mode 100644
index 0000000..f29fb5f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ratis;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.util.LifeCycle;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * Test class for SCM Ratis Server.
+ */
+public class TestSCMRatisServer {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private OzoneConfiguration conf;
+ private SCMRatisServer scmRatisServer;
+ private StorageContainerManager scm;
+ private String scmId;
+ private SCMNodeDetails scmNodeDetails;
+ private static final long LEADER_ELECTION_TIMEOUT = 500L;
+
+ @Before
+ public void init() throws Exception {
+ conf = new OzoneConfiguration();
+ scmId = UUID.randomUUID().toString();
+ conf.setTimeDuration(
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ int ratisPort = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
+ InetSocketAddress rpcAddress = new InetSocketAddress(
+ InetAddress.getLocalHost(), 0);
+ scmNodeDetails = new SCMNodeDetails.Builder()
+ .setRatisPort(ratisPort)
+ .setRpcAddress(rpcAddress)
+ .setSCMNodeId(scmId)
+ .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
+ .build();
+
+ // Standalone SCM Ratis server
+ initSCM();
+ scm = HddsTestUtils.getScm(conf);
+ scm.start();
+ scmRatisServer = SCMRatisServer.newSCMRatisServer(
+ conf, scm, scmNodeDetails, Collections.EMPTY_LIST);
+ scmRatisServer.start();
+ }
+
+ @After
+ public void shutdown() {
+ if (scmRatisServer != null) {
+ scmRatisServer.stop();
+ }
+ if (scm != null) {
+ scm.stop();
+ }
+ }
+
+ @Test
+ public void testStartSCMRatisServer() throws Exception {
+ Assert.assertEquals("Ratis Server should be in running state",
+ LifeCycle.State.RUNNING, scmRatisServer.getServerState());
+ }
+
+ @Test
+ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws
+ Exception {
+ String customScmServiceId = "scmIdCustom123";
+ OzoneConfiguration newConf = new OzoneConfiguration();
+ String newOmId = UUID.randomUUID().toString();
+ String path = GenericTestUtils.getTempPath(newOmId);
+ Path metaDirPath = Paths.get(path, "scm-meta");
+ newConf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
+ newConf.setTimeDuration(
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ int ratisPort = 9873;
+ InetSocketAddress rpcAddress = new InetSocketAddress(
+ InetAddress.getLocalHost(), 0);
+ SCMNodeDetails nodeDetails = new SCMNodeDetails.Builder()
+ .setRpcAddress(rpcAddress)
+ .setRatisPort(ratisPort)
+ .setSCMNodeId(newOmId)
+ .setSCMServiceId(customScmServiceId)
+ .build();
+ // Starts a single node Ratis server
+ scmRatisServer.stop();
+ SCMRatisServer newScmRatisServer = SCMRatisServer
+ .newSCMRatisServer(newConf, scm, nodeDetails,
+ Collections.emptyList());
+ newScmRatisServer.start();
+
+ UUID uuid = UUID.nameUUIDFromBytes(customScmServiceId.getBytes());
+ RaftGroupId raftGroupId = newScmRatisServer.getRaftGroup().getGroupId();
+ Assert.assertEquals(uuid, raftGroupId.getUuid());
+ Assert.assertEquals(raftGroupId.toByteString().size(), 16);
+ newScmRatisServer.stop();
+ }
+
+ private void initSCM() throws IOException {
+ String clusterId = UUID.randomUUID().toString();
+ scmId = UUID.randomUUID().toString();
+
+ final String path = folder.newFolder().toString();
+ Path scmPath = Paths.get(path, "scm-meta");
+ Files.createDirectories(scmPath);
+ conf.set(OZONE_METADATA_DIRS, scmPath.toString());
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
+ scmStore.setClusterId(clusterId);
+ scmStore.setScmId(scmId);
+ // writes the version file properties
+ scmStore.initialize();
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index d7a8d20..caf5277 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -44,6 +44,7 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
errorIfMissingXmlProps = true;
xmlPropsToSkipCompare.add("hadoop.tags.custom");
xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID");
+ xmlPropsToSkipCompare.add("ozone.scm.nodes.EXAMPLESCMSERVICEID");
xmlPrefixToSkipCompare.add("ipc.client.rpc-timeout.ms");
addPropertiesNotInXml();
}
@@ -56,6 +57,7 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
HddsConfigKeys.HDDS_SECURITY_PROVIDER,
HddsConfigKeys.HDDS_X509_CRL_NAME, // HDDS-2873
OMConfigKeys.OZONE_OM_NODES_KEY,
+ ScmConfigKeys.OZONE_SCM_NODES_KEY,
OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE,
OzoneConfigKeys.OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY,
ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org