You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by li...@apache.org on 2020/06/02 03:11:11 UTC
[hadoop-ozone] 02/07: HDDS-3187 Construct SCM StateMachine. (#819)
This is an automated email from the ASF dual-hosted git repository.
licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 4c035bb2473b1ebf9e87f596d832b6c5473ce62b
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Fri Apr 17 15:39:03 2020 +0800
HDDS-3187 Construct SCM StateMachine. (#819)
Co-authored-by: Li Cheng <ti...@tencent.com>
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 5 +
.../java/org/apache/hadoop/hdds/scm/ScmUtils.java | 12 ++
.../hdds/scm/container/ContainerManager.java | 5 +
.../hdds/scm/container/SCMContainerManager.java | 21 +-
.../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 42 ++++
.../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java | 18 +-
.../hadoop/hdds/scm/ratis/SCMStateMachine.java | 35 ---
.../hdds/scm/server/StorageContainerManager.java | 59 ++++-
.../scm/{ => server}/ratis/SCMRatisServer.java | 25 ++-
.../scm/server/ratis/SCMRatisSnapshotInfo.java | 179 +++++++++++++++
.../hdds/scm/server/ratis/SCMStateMachine.java | 240 +++++++++++++++++++++
.../hdds/scm/{ => server}/ratis/package-info.java | 2 +-
.../scm/{ => server}/ratis/TestSCMRatisServer.java | 23 +-
.../hdds/scm/server/ratis/TestSCMStateMachine.java | 120 +++++++++++
14 files changed, 714 insertions(+), 72 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index bb8e145..a8b48ba 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -352,4 +352,9 @@ public final class OzoneConsts {
// SCM HA
public static final String SCM_SERVICE_ID_DEFAULT = "scmServiceIdDefault";
+
+ // SCM Ratis snapshot file to store the last applied index
+ public static final String SCM_RATIS_SNAPSHOT_INDEX = "scmRatisSnapshotIndex";
+
+ public static final String SCM_RATIS_SNAPSHOT_TERM = "scmRatisSnapshotTerm";
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
index 426341a..bb48654 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.scm.safemode.Precheck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+
/**
* SCM utility class.
*/
@@ -48,4 +50,14 @@ public final class ScmUtils {
}
}
+ /**
+ * Create SCM directory file based on given path.
+ */
+ public static File createSCMDir(String dirPath) {
+ File dirFile = new File(dirPath);
+ if (!dirFile.mkdirs() && !dirFile.exists()) {
+ throw new IllegalArgumentException("Unable to create path: " + dirFile);
+ }
+ return dirFile;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 43c1ced..f17a2f4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -189,4 +189,9 @@ public interface ContainerManager extends Closeable {
* @param success
*/
void notifyContainerReportProcessing(boolean isFullReport, boolean success);
+
+ /**
+ * Flush metadata of container manager if they are required to be persisted.
+ */
+ void flushDB() throws IOException;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 9f47608..ee8c689 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -365,13 +365,20 @@ public class SCMContainerManager implements ContainerManager {
}
}
- /**
- * Update deleteTransactionId according to deleteTransactionMap.
- *
- * @param deleteTransactionMap Maps the containerId to latest delete
- * transaction id for the container.
- * @throws IOException
- */
+ @Override
+ public void flushDB() throws IOException {
+ if (containerStore != null) {
+ containerStore.flushDB(true);
+ }
+ }
+
+ /**
+ * Update deleteTransactionId according to deleteTransactionMap.
+ *
+ * @param deleteTransactionMap Maps the containerId to latest delete
+ * transaction id for the container.
+ * @throws IOException
+ */
public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
throws IOException {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index c0364ad..eb22566 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -18,8 +18,15 @@
package org.apache.hadoop.hdds.scm.ha;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ScmUtils;
+import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer;
+
+import java.io.File;
+import java.util.Collection;
/**
* Utility class used by SCM HA.
@@ -34,4 +41,39 @@ public final class SCMHAUtils {
return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY,
ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
}
+
+ public static File createSCMRatisDir(OzoneConfiguration conf)
+ throws IllegalArgumentException {
+ String scmRatisDir = SCMRatisServer.getSCMRatisDirectory(conf);
+ if (scmRatisDir == null || scmRatisDir.isEmpty()) {
+ throw new IllegalArgumentException(HddsConfigKeys.OZONE_METADATA_DIRS +
+ " must be defined.");
+ }
+ return ScmUtils.createSCMDir(scmRatisDir);
+ }
+
+ /**
+ * Get a collection of all scmNodeIds for the given scmServiceId.
+ */
+ public static Collection<String> getSCMNodeIds(Configuration conf,
+ String scmServiceId) {
+ String key = addSuffix(ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
+ return conf.getTrimmedStringCollection(key);
+ }
+
+ public static String getLocalSCMNodeId(String scmServiceId) {
+ return addSuffix(ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
+ }
+
+ /**
+ * Add non empty and non null suffix to a key.
+ */
+ private static String addSuffix(String key, String suffix) {
+ if (suffix == null || suffix.isEmpty()) {
+ return key;
+ }
+ assert !suffix.startsWith(".") :
+ "suffix '" + suffix + "' should not already have '.' prepended.";
+ return key + "." + suffix;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
index 8d66187..2390cb3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.ha;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +28,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;
/**
* Construct SCM node details.
@@ -153,6 +153,18 @@ public final class SCMNodeDetails {
public static SCMNodeDetails initStandAlone(
OzoneConfiguration conf) throws IOException {
String localSCMServiceId = conf.getTrimmed(OZONE_SCM_INTERNAL_SERVICE_ID);
+ if (localSCMServiceId == null) {
+ // There is no internal om service id is being set, fall back to ozone
+ // .om.service.ids.
+ LOG.info("{} is not defined, falling back to {} to find serviceID for "
+ + "SCM if it is HA enabled cluster",
+ OZONE_SCM_INTERNAL_SERVICE_ID, OZONE_SCM_SERVICE_IDS_KEY);
+ localSCMServiceId = conf.getTrimmed(
+ OZONE_SCM_SERVICE_IDS_KEY);
+ } else {
+ LOG.info("ServiceID for SCM is {}", localSCMServiceId);
+ }
+ String localSCMNodeId = SCMHAUtils.getLocalSCMNodeId(localSCMServiceId);
int ratisPort = conf.getInt(
ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
@@ -161,8 +173,8 @@ public final class SCMNodeDetails {
SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
.setRatisPort(ratisPort)
.setRpcAddress(rpcAddress)
- .setSCMNodeId(localSCMServiceId)
- .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
+ .setSCMNodeId(localSCMNodeId)
+ .setSCMServiceId(localSCMServiceId)
.build();
return scmNodeDetails;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
deleted file mode 100644
index 502260a..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.ratis;
-
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
-
-/**
- * Class for SCM StateMachine.
- */
-public class SCMStateMachine extends BaseStateMachine {
- //TODO to be implemented
- public SCMStateMachine(SCMRatisServer ratisServer) {
-
- }
-
- public void stop() {
- return;
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 6f978bd..b8527bc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -25,6 +25,13 @@ import javax.management.ObjectName;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.protobuf.BlockingService;
+
+import java.io.File;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Collection;
@@ -45,7 +52,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
-import org.apache.hadoop.hdds.scm.ratis.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisSnapshotInfo;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -114,13 +122,9 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.util.JvmPauseMonitor;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.protobuf.BlockingService;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.server.protocol.TermIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -195,6 +199,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
// SCM HA related
private SCMRatisServer scmRatisServer;
+ private SCMRatisSnapshotInfo scmRatisSnapshotInfo;
+ private File scmRatisSnapshotDir;
private JvmPauseMonitor jvmPauseMonitor;
private final OzoneConfiguration configuration;
@@ -264,6 +270,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
if (SCMHAUtils.isSCMHAEnabled(conf)) {
+ this.scmRatisSnapshotInfo = new SCMRatisSnapshotInfo(
+ scmStorageConfig.getCurrentDir());
+ this.scmRatisSnapshotDir = SCMHAUtils.createSCMRatisDir(conf);
initializeRatisServer();
} else {
scmRatisServer = null;
@@ -796,6 +805,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
getClientRpcAddress()));
}
+ if (scmRatisServer != null) {
+ scmRatisServer.start();
+ }
+
ms = HddsServerUtil
.initializeMetrics(configuration, "StorageContainerManager");
@@ -1137,4 +1150,38 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
}
}
+
+ @VisibleForTesting
+ public SCMRatisServer getScmRatisServer() {
+ return scmRatisServer;
+ }
+
+ @VisibleForTesting
+ public SCMRatisSnapshotInfo getSnapshotInfo() {
+ return scmRatisSnapshotInfo;
+ }
+
+ @VisibleForTesting
+ public long getRatisSnapshotIndex() {
+ return scmRatisSnapshotInfo.getIndex();
+ }
+
+ /**
+ * Save ratis snapshot to SCM meta store and local disk.
+ */
+ public TermIndex saveRatisSnapshot() throws IOException {
+ TermIndex snapshotIndex = scmRatisServer.getLastAppliedTermIndex();
+ if (scmMetadataStore != null) {
+ // Flush the SCM state to disk
+ scmMetadataStore.getStore().flush();
+ }
+
+ if (containerManager != null) {
+ containerManager.flushDB();
+ }
+
+ scmRatisSnapshotInfo.saveRatisSnapshotToDisk(snapshotIndex);
+
+ return snapshotIndex;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
similarity index 97%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
index af1e5c2..77dee6a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hdds.scm.ratis;
+package org.apache.hadoop.hdds.scm.server.ratis;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
@@ -42,6 +42,7 @@ import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
@@ -94,6 +95,10 @@ public final class SCMRatisServer {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}
+ /**
+ * Creates a SCM Ratis Server.
+ * @throws IOException
+ */
private SCMRatisServer(Configuration conf,
StorageContainerManager scm,
String raftGroupIdStr, RaftPeerId localRaftPeerId,
@@ -139,6 +144,9 @@ public final class SCMRatisServer {
}, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
}
+ /**
+ * Create a SCM Ratis Server instance.
+ */
public static SCMRatisServer newSCMRatisServer(
Configuration conf, StorageContainerManager scm,
SCMNodeDetails scmNodeDetails, List<SCMNodeDetails> peers)
@@ -178,7 +186,7 @@ public final class SCMRatisServer {
return new SCMStateMachine(this);
}
- private RaftProperties newRaftProperties(Configuration conf){
+ private RaftProperties newRaftProperties(Configuration conf) {
final RaftProperties properties = new RaftProperties();
// Set RPC type
final String rpcType = conf.get(
@@ -403,6 +411,15 @@ public final class SCMRatisServer {
}
}
+ public StorageContainerManager getSCM() {
+ return scm;
+ }
+
+ @VisibleForTesting
+ public SCMStateMachine getScmStateMachine() {
+ return scmStateMachine;
+ }
+
public int getServerPort() {
return port;
}
@@ -441,6 +458,10 @@ public final class SCMRatisServer {
}
}
+ public TermIndex getLastAppliedTermIndex() {
+ return scmStateMachine.getLastAppliedTermIndex();
+ }
+
private GroupInfoReply getGroupInfo() throws IOException {
GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
raftPeerId, raftGroupId, nextCallId());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisSnapshotInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisSnapshotInfo.java
new file mode 100644
index 0000000..11b3234
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisSnapshotInfo.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.ratis;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_RATIS_SNAPSHOT_INDEX;
+
+/**
+ * This class captures the snapshotIndex and term of the latest snapshot in
+ * the SCM.
+ * Ratis server loads the snapshotInfo during startup and updates the
+ * lastApplied index to this snapshotIndex. OM SnapshotInfo does not contain
+ * any files. It is used only to store/ update the last applied index and term.
+ */
+public class SCMRatisSnapshotInfo implements SnapshotInfo {
+
+ static final Logger LOG = LoggerFactory.getLogger(SCMRatisSnapshotInfo.class);
+
+ private volatile long term = 0;
+ private volatile long snapshotIndex = -1;
+
+ private final File ratisSnapshotFile;
+
+ public SCMRatisSnapshotInfo(File ratisDir) throws IOException {
+ ratisSnapshotFile = new File(ratisDir, SCM_RATIS_SNAPSHOT_INDEX);
+ loadRatisSnapshotIndex();
+ }
+
+ public void updateTerm(long newTerm) {
+ term = newTerm;
+ }
+
+ private void updateTermIndex(long newTerm, long newIndex) {
+ this.term = newTerm;
+ this.snapshotIndex = newIndex;
+ }
+
+ /**
+ * Load the snapshot index and term from the snapshot file on disk,
+ * if it exists.
+ * @throws IOException
+ */
+ private void loadRatisSnapshotIndex() throws IOException {
+ if (ratisSnapshotFile.exists()) {
+ RatisSnapshotYaml ratisSnapshotYaml = readRatisSnapshotYaml();
+ updateTermIndex(ratisSnapshotYaml.term, ratisSnapshotYaml.snapshotIndex);
+ }
+ }
+
+ /**
+ * Read and parse the snapshot yaml file.
+ */
+ private RatisSnapshotYaml readRatisSnapshotYaml() throws IOException {
+ try (FileInputStream inputFileStream = new FileInputStream(
+ ratisSnapshotFile)) {
+ Yaml yaml = new Yaml();
+ try {
+ return yaml.loadAs(inputFileStream, RatisSnapshotYaml.class);
+ } catch (Exception e) {
+ throw new IOException("Unable to parse RatisSnapshot yaml file.", e);
+ }
+ }
+ }
+
+ /**
+ * Update and persist the snapshot index and term to disk.
+ * @param lastAppliedTermIndex new snapshot index to be persisted to disk.
+ * @throws IOException
+ */
+ public void saveRatisSnapshotToDisk(TermIndex lastAppliedTermIndex)
+ throws IOException {
+ updateTermIndex(lastAppliedTermIndex.getTerm(),
+ lastAppliedTermIndex.getIndex());
+ writeRatisSnapshotYaml();
+ LOG.info("Saved Ratis Snapshot on the SCM with snapshotIndex {}",
+ lastAppliedTermIndex);
+ }
+
+ /**
+ * Write snapshot details to disk in yaml format.
+ */
+ private void writeRatisSnapshotYaml() throws IOException {
+ DumperOptions options = new DumperOptions();
+ options.setPrettyFlow(true);
+ options.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW);
+ Yaml yaml = new Yaml(options);
+
+ RatisSnapshotYaml ratisSnapshotYaml = new RatisSnapshotYaml(term,
+ snapshotIndex);
+
+ try (Writer writer = new OutputStreamWriter(
+ new FileOutputStream(ratisSnapshotFile), "UTF-8")) {
+ yaml.dump(ratisSnapshotYaml, writer);
+ }
+ }
+
+ @Override
+ public TermIndex getTermIndex() {
+ return TermIndex.newTermIndex(term, snapshotIndex);
+ }
+
+ @Override
+ public long getTerm() {
+ return term;
+ }
+
+ @Override
+ public long getIndex() {
+ return snapshotIndex;
+ }
+
+ @Override
+ public List<FileInfo> getFiles() {
+ return null;
+ }
+
+ /**
+ * Ratis Snapshot details to be written to the yaml file.
+ */
+ public static class RatisSnapshotYaml {
+ private long term;
+ private long snapshotIndex;
+
+ public RatisSnapshotYaml() {
+ // Needed for snake-yaml introspection.
+ }
+
+ RatisSnapshotYaml(long term, long snapshotIndex) {
+ this.term = term;
+ this.snapshotIndex = snapshotIndex;
+ }
+
+ public void setTerm(long term) {
+ this.term = term;
+ }
+
+ public long getTerm() {
+ return this.term;
+ }
+
+ public void setSnapshotIndex(long index) {
+ this.snapshotIndex = index;
+ }
+
+ public long getSnapshotIndex() {
+ return this.snapshotIndex;
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
new file mode 100644
index 0000000..b60570b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.ratis;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.util.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for SCM StateMachine.
+ */
+public class SCMStateMachine extends BaseStateMachine {
+ static final Logger LOG =
+ LoggerFactory.getLogger(SCMStateMachine.class);
+ private final SimpleStateMachineStorage storage =
+ new SimpleStateMachineStorage();
+ private final SCMRatisServer scmRatisServer;
+ private final StorageContainerManager scm;
+ private RaftGroupId raftGroupId;
+ private final SCMRatisSnapshotInfo snapshotInfo;
+ private final ExecutorService executorService;
+ private final ExecutorService installSnapshotExecutor;
+
+ // Map which contains index and term for the ratis transactions which are
+ // stateMachine entries which are recived through applyTransaction.
+ private ConcurrentMap<Long, Long> applyTransactionMap =
+ new ConcurrentSkipListMap<>();
+
+ /**
+ * Create a SCM state machine.
+ */
+ public SCMStateMachine(SCMRatisServer ratisServer) {
+ this.scmRatisServer = ratisServer;
+ this.scm = ratisServer.getSCM();
+
+ this.snapshotInfo = scm.getSnapshotInfo();
+ updateLastAppliedIndexWithSnaphsotIndex();
+
+ ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("SCM StateMachine ApplyTransaction Thread - %d").build();
+ this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
+ this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
+ }
+
+ /**
+ * Initializes the State Machine with the given server, group and storage.
+ */
+ @Override
+ public void initialize(RaftServer server, RaftGroupId id,
+ RaftStorage raftStorage) throws IOException {
+ lifeCycle.startAndTransition(() -> {
+ super.initialize(server, id, raftStorage);
+ this.raftGroupId = id;
+ storage.init(raftStorage);
+ });
+ }
+
+ /**
+ * Pre-execute the update request into state machine.
+ */
+ @Override
+ public TransactionContext startTransaction(
+ RaftClientRequest raftClientRequest) {
+ return TransactionContext.newBuilder()
+ .setClientRequest(raftClientRequest)
+ .setStateMachine(this)
+ .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+ .setLogData(raftClientRequest.getMessage().getContent())
+ .build();
+ }
+
+ /**
+ * Apply a committed log entry to state machine.
+ */
+ @Override
+ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+ CompletableFuture<Message> ratisFuture =
+ new CompletableFuture<>();
+ //TODO execute SCMRequest and process SCMResponse
+ return ratisFuture;
+ }
+
+ /**
+ * Query state machine.
+ */
+ @Override
+ public CompletableFuture<Message> query(Message request) {
+ //TODO make handler respond to the query request.
+ return CompletableFuture.completedFuture(request);
+ }
+
+ /**
+ * Pause state machine.
+ */
+ @Override
+ public void pause() {
+ lifeCycle.transition(LifeCycle.State.PAUSING);
+ lifeCycle.transition(LifeCycle.State.PAUSED);
+ }
+
+ /**
+ * Unpause state machine and update the lastAppliedIndex.
+ * Following after uploading new state to state machine.
+ */
+ public void unpause(long newLastAppliedSnaphsotIndex,
+ long newLastAppliedSnapShotTermIndex) {
+ lifeCycle.startAndTransition(() -> {
+ this.setLastAppliedTermIndex(TermIndex.newTermIndex(
+ newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
+ });
+ }
+
+ /**
+ * Take SCM snapshot and write index to file.
+ * @return actual index or 0 if error.
+ */
+ @Override
+ public long takeSnapshot() throws IOException {
+ LOG.info("Saving Ratis snapshot on the SCM.");
+ if (scm != null) {
+ return scm.saveRatisSnapshot().getIndex();
+ }
+ return 0;
+ }
+
+ /**
+ * Get latest SCM snapshot.
+ */
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ return snapshotInfo;
+ }
+
+ private synchronized void updateLastApplied() {
+ Long appliedTerm = null;
+ long appliedIndex = -1;
+ for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
+ final Long removed = applyTransactionMap.remove(i);
+ if (removed == null) {
+ break;
+ }
+ appliedTerm = removed;
+ appliedIndex = i;
+ }
+ if (appliedTerm != null) {
+ updateLastAppliedTermIndex(appliedTerm, appliedIndex);
+ }
+ }
+
+ /**
+ * Called to notify state machine about indexes which are processed
+ * internally by Raft Server, this currently happens when conf entries are
+ * processed in raft Server. This keep state machine to keep a track of index
+ * updates.
+ */
+ @Override
+ public void notifyIndexUpdate(long currentTerm, long index) {
+ applyTransactionMap.put(index, currentTerm);
+ updateLastApplied();
+ snapshotInfo.updateTerm(currentTerm);
+ }
+
+ /**
+ * Notifies the state machine that the raft peer is no longer leader.
+ */
+ @Override
+ public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
+ scmRatisServer.updateServerRole();
+ }
+
+ /**
+ * Transfer from log entry to string.
+ */
+ @Override
+ public String toStateMachineLogEntryString(
+ RaftProtos.StateMachineLogEntryProto proto) {
+ //TODO implement transfer from proto to SCMRequest body.
+ return null;
+ }
+
+ /**
+ * Update lastAppliedIndex term in snapshot info.
+ */
+ public void updateLastAppliedIndexWithSnaphsotIndex() {
+ setLastAppliedTermIndex(TermIndex.newTermIndex(snapshotInfo.getTerm(),
+ snapshotInfo.getIndex()));
+ LOG.info("LastAppliedIndex set from SnapShotInfo {}",
+ getLastAppliedTermIndex());
+ }
+
+ @VisibleForTesting
+ void addApplyTransactionTermIndex(long term, long index) {
+ applyTransactionMap.put(index, term);
+ }
+
+ public void stop() {
+ HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+ HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/package-info.java
similarity index 94%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/package-info.java
index 4944017..77f4afa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/package-info.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdds.scm.ratis;
+package org.apache.hadoop.hdds.scm.server.ratis;
/**
* This package contains classes related to Apache Ratis for SCM.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMRatisServer.java
similarity index 86%
rename from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMRatisServer.java
index f29fb5f..d6981d3 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMRatisServer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hdds.scm.ratis;
+package org.apache.hadoop.hdds.scm.server.ratis;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.util.LifeCycle;
@@ -59,7 +58,6 @@ public class TestSCMRatisServer {
private SCMRatisServer scmRatisServer;
private StorageContainerManager scm;
private String scmId;
- private SCMNodeDetails scmNodeDetails;
private static final long LEADER_ELECTION_TIMEOUT = 500L;
@Before
@@ -69,25 +67,14 @@ public class TestSCMRatisServer {
conf.setTimeDuration(
ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
- int ratisPort = conf.getInt(
- ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
- ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
- InetSocketAddress rpcAddress = new InetSocketAddress(
- InetAddress.getLocalHost(), 0);
- scmNodeDetails = new SCMNodeDetails.Builder()
- .setRatisPort(ratisPort)
- .setRpcAddress(rpcAddress)
- .setSCMNodeId(scmId)
- .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
- .build();
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+ conf.set(ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID, "scm-ha-test");
// Standalone SCM Ratis server
initSCM();
scm = HddsTestUtils.getScm(conf);
scm.start();
- scmRatisServer = SCMRatisServer.newSCMRatisServer(
- conf, scm, scmNodeDetails, Collections.EMPTY_LIST);
- scmRatisServer.start();
+ scmRatisServer = scm.getScmRatisServer();
}
@After
@@ -101,7 +88,7 @@ public class TestSCMRatisServer {
}
@Test
- public void testStartSCMRatisServer() throws Exception {
+ public void testStartSCMRatisServer() {
Assert.assertEquals("Ratis Server should be in running state",
LifeCycle.State.RUNNING, scmRatisServer.getServerState());
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMStateMachine.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMStateMachine.java
new file mode 100644
index 0000000..69bc5bd
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMStateMachine.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.server.ratis;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * Test class for SCMStateMachine.
+ */
+public class TestSCMStateMachine {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private SCMStateMachine scmStateMachine;
+ private StorageContainerManager scm;
+ private SCMRatisServer scmRatisServer;
+ private OzoneConfiguration conf;
+ private String scmId;
+ @Before
+ public void init() throws Exception {
+ conf = new OzoneConfiguration();
+ conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+ conf.set(ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID, "scm-ha-test");
+ scmId = UUID.randomUUID().toString();
+
+ initSCM();
+ scm = HddsTestUtils.getScm(conf);
+ scm.start();
+ scmRatisServer = scm.getScmRatisServer();
+ scmStateMachine = scm.getScmRatisServer().getScmStateMachine();
+ }
+
+ @Test
+ public void testSCMUpdatedAppliedIndex(){
+ // State machine should start with 0 term and 0 index.
+ scmStateMachine.notifyIndexUpdate(0, 0);
+ Assert.assertEquals(0,
+ scmStateMachine.getLastAppliedTermIndex().getTerm());
+ Assert.assertEquals(0,
+ scmStateMachine.getLastAppliedTermIndex().getIndex());
+
+ // If only the transactionMap is updated, index should stay 0.
+ scmStateMachine.addApplyTransactionTermIndex(0, 1);
+ Assert.assertEquals(0L,
+ scmStateMachine.getLastAppliedTermIndex().getTerm());
+ Assert.assertEquals(0L,
+ scmStateMachine.getLastAppliedTermIndex().getIndex());
+
+ // After the index update is notified, the index should increase.
+ scmStateMachine.notifyIndexUpdate(0, 1);
+ Assert.assertEquals(0L,
+ scmStateMachine.getLastAppliedTermIndex().getTerm());
+ Assert.assertEquals(1L,
+ scmStateMachine.getLastAppliedTermIndex().getIndex());
+
+ // Only do a notifyIndexUpdate can also increase the index.
+ scmStateMachine.notifyIndexUpdate(0, 2);
+ Assert.assertEquals(0L,
+ scmStateMachine.getLastAppliedTermIndex().getTerm());
+ Assert.assertEquals(2L,
+ scmStateMachine.getLastAppliedTermIndex().getIndex());
+
+ // If a larger index is notified, the index should not be updated.
+ scmStateMachine.notifyIndexUpdate(0, 5);
+ Assert.assertEquals(0L,
+ scmStateMachine.getLastAppliedTermIndex().getTerm());
+ Assert.assertEquals(2L,
+ scmStateMachine.getLastAppliedTermIndex().getIndex());
+ }
+
+ private void initSCM() throws IOException {
+ String clusterId = UUID.randomUUID().toString();
+ final String path = folder.newFolder().toString();
+ Path scmPath = Paths.get(path, "scm-meta");
+ Files.createDirectories(scmPath);
+ conf.set(OZONE_METADATA_DIRS, scmPath.toString());
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
+ scmStore.setClusterId(clusterId);
+ scmStore.setScmId(scmId);
+ // writes the version file properties
+ scmStore.initialize();
+ }
+
+ @After
+ public void cleanup() {
+ scm.stop();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org