You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2019/04/04 05:50:39 UTC
[hadoop] branch trunk updated: HDDS-1339. Implement ratis snapshots
on OM (#651)
This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new f09a78f HDDS-1339. Implement ratis snapshots on OM (#651)
f09a78f is described below
commit f09a78f73fbb5bed5bac87ab86354b63bb605e5b
Author: Hanisha Koneru <ko...@gmail.com>
AuthorDate: Wed Apr 3 22:50:28 2019 -0700
HDDS-1339. Implement ratis snapshots on OM (#651)
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 3 +
.../org/apache/hadoop/ozone/common/Storage.java | 2 +-
.../java/org/apache/hadoop/utils/db/DBStore.java | 6 ++
.../java/org/apache/hadoop/utils/db/RDBStore.java | 11 ++
.../common/src/main/resources/ozone-default.xml | 13 ++-
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 6 ++
.../ozone/om/protocol/OzoneManagerHAProtocol.java | 10 +-
.../hadoop/ozone/MiniOzoneHAClusterImpl.java | 4 +
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 90 +++++++++++++++-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 45 +++++++-
.../ozone/om/ratis/OzoneManagerRatisServer.java | 28 +++--
.../ozone/om/ratis/OzoneManagerStateMachine.java | 115 +++++++++++++--------
12 files changed, 265 insertions(+), 68 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 8e3b02a..3e15241 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -279,4 +279,7 @@ public final class OzoneConsts {
// Dummy OMNodeID for OM Clients to use for a non-HA OM setup
public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";
+
+ // OM Ratis snapshot file to store the last applied index
+ public static final String OM_RATIS_SNAPSHOT_INDEX = "ratisSnapshotIndex";
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
index 9ad87ae..f393ed9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
@@ -145,7 +145,7 @@ public abstract class Storage {
*
* @return the directory path
*/
- private File getCurrentDir() {
+ public File getCurrentDir() {
return new File(storageDir, STORAGE_DIR_CURRENT);
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
index 0bc30d0..56166ab 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
@@ -66,6 +66,12 @@ public interface DBStore extends AutoCloseable {
ArrayList<Table> listTables() throws IOException;
/**
+ * Flush the DB buffer onto persistent storage.
+ * @throws IOException
+ */
+ void flush() throws IOException;
+
+ /**
* Compact the entire database.
*
* @throws IOException on Failure
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
index 9a7119e..5bb0fa4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
@@ -273,6 +273,17 @@ public class RDBStore implements DBStore {
}
@Override
+ public void flush() throws IOException {
+ final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true);
+ try {
+ db.flush(flushOptions);
+ } catch (RocksDBException e) {
+ LOG.error("Unable to Flush RocksDB data", e);
+ throw toIOException("Unable to Flush RocksDB data", e);
+ }
+ }
+
+ @Override
public DBCheckpoint getCheckpoint(boolean flush) {
final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(flush);
try {
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index cbd249a..5580548 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1603,19 +1603,28 @@
<property>
<name>ozone.om.ratis.log.appender.queue.num-elements</name>
<value>1024</value>
- <tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
+ <tag>OZONE, DEBUG, OM, RATIS</tag>
<description>Number of operation pending with Raft's Log Worker.
</description>
</property>
<property>
<name>ozone.om.ratis.log.appender.queue.byte-limit</name>
<value>32MB</value>
- <tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
+ <tag>OZONE, DEBUG, OM, RATIS</tag>
<description>Byte limit for Raft's Log Worker queue.
</description>
</property>
<property>
+ <name>ozone.om.ratis.snapshot.auto.trigger.threshold</name>
+ <value>400000</value>
+ <tag>OZONE, DEBUG, OM, RATIS</tag>
+ <description>The log index threshold after ratis will auto trigger
+ snapshot on the OM state machine.
+ </description>
+ </property>
+
+ <property>
<name>ozone.om.ratis.server.request.timeout</name>
<value>3s</value>
<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 7b13471..60dde44 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -136,6 +136,12 @@ public final class OMConfigKeys {
public static final String
OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
+ // OM Snapshot configurations
+ public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY
+ = "ozone.om.ratis.snapshot.auto.trigger.threshold";
+ public static final long
+ OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT
+ = 400000;
// OM Ratis server configurations
public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
index 8357df2..f598997 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
@@ -37,6 +37,14 @@ import java.io.IOException;
public interface OzoneManagerHAProtocol {
/**
+ * Store the snapshot index i.e. the raft log index, corresponding to the
+ * last transaction applied to the OM RocksDB, in OM metadata dir on disk.
+ * @return the snapshot index
+ * @throws IOException
+ */
+ long saveRatisSnapshot() throws IOException;
+
+ /**
* Add a allocate block, it is assumed that the client is having an open
* key session going on. This block will be appended to this open key session.
* This will be called only during HA enabled OM, as during HA we get an
@@ -56,7 +64,6 @@ public interface OzoneManagerHAProtocol {
OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException;
-
/**
* Add the openKey entry with given keyInfo and clientID in to openKeyTable.
* This will be called only from applyTransaction, once after calling
@@ -81,5 +88,4 @@ public interface OzoneManagerHAProtocol {
*/
OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
String multipartUploadID) throws IOException;
-
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index f84f95e..03c2a2c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -87,6 +87,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
return this.ozoneManagers.get(index);
}
+ public OzoneManager getOzoneManager(String omNodeId) {
+ return this.ozoneManagerMap.get(omNodeId);
+ }
+
@Override
public void restartOzoneManager() throws IOException {
for (OzoneManager ozoneManager : ozoneManagers) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 93c120a..06009e2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -76,6 +76,7 @@ public class TestOzoneManagerHA {
private String clusterId;
private String scmId;
private int numOfOMs = 3;
+ private static final long SNAPSHOT_THRESHOLD = 50;
@Rule
public ExpectedException exception = ExpectedException.none();
@@ -99,7 +100,9 @@ public class TestOzoneManagerHA {
conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 10);
conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10);
-
+ conf.setLong(
+ OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
+ SNAPSHOT_THRESHOLD);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
@@ -326,9 +329,8 @@ public class TestOzoneManagerHA {
throw e;
}
}
-
-
}
+
/**
* Create a volume and test its attribute.
*/
@@ -370,8 +372,6 @@ public class TestOzoneManagerHA {
}
}
-
-
/**
* Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
* cluster.
@@ -533,4 +533,84 @@ public class TestOzoneManagerHA {
proxyProvider.getCurrentProxyOMNodeId());
}
}
+
+ @Test
+ public void testOMRatisSnapshot() throws Exception {
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+ String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+ .setOwner(userName)
+ .setAdmin(adminName)
+ .build();
+
+ objectStore.createVolume(volumeName, createVolumeArgs);
+ OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+ retVolumeinfo.createBucket(bucketName);
+ OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+ String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
+ .getCurrentProxyOMNodeId();
+ OzoneManager ozoneManager = cluster.getOzoneManager(leaderOMNodeId);
+
+ // Send commands to ratis to increase the log index so that ratis
+ // triggers a snapshot on the state machine.
+
+ long appliedLogIndex = 0;
+ while (appliedLogIndex <= SNAPSHOT_THRESHOLD) {
+ createKey(ozoneBucket);
+ appliedLogIndex = ozoneManager.getOmRatisServer()
+ .getStateMachineLastAppliedIndex();
+ }
+
+ GenericTestUtils.waitFor(() -> {
+ if (ozoneManager.loadRatisSnapshotIndex() > 0) {
+ return true;
+ }
+ return false;
+ }, 1000, 100000);
+
+ // The current lastAppliedLogIndex on the state machine should be greater
+ // than or equal to the saved snapshot index.
+ long smLastAppliedIndex =
+ ozoneManager.getOmRatisServer().getStateMachineLastAppliedIndex();
+ long ratisSnapshotIndex = ozoneManager.loadRatisSnapshotIndex();
+ Assert.assertTrue("LastAppliedIndex on OM State Machine ("
+ + smLastAppliedIndex + ") is less than the saved snapshot index("
+ + ratisSnapshotIndex + ").",
+ smLastAppliedIndex >= ratisSnapshotIndex);
+
+ // Add more transactions to Ratis to trigger another snapshot
+ while (appliedLogIndex <= (smLastAppliedIndex + SNAPSHOT_THRESHOLD)) {
+ createKey(ozoneBucket);
+ appliedLogIndex = ozoneManager.getOmRatisServer()
+ .getStateMachineLastAppliedIndex();
+ }
+
+ GenericTestUtils.waitFor(() -> {
+ if (ozoneManager.loadRatisSnapshotIndex() > 0) {
+ return true;
+ }
+ return false;
+ }, 1000, 100000);
+
+ // The new snapshot index must be greater than the previous snapshot index
+ long ratisSnapshotIndexNew = ozoneManager.loadRatisSnapshotIndex();
+ Assert.assertTrue("Latest snapshot index must be greater than previous " +
+ "snapshot indices", ratisSnapshotIndexNew > ratisSnapshotIndex);
+
+ }
+
+ private void createKey(OzoneBucket ozoneBucket) throws IOException {
+ String keyName = "key" + RandomStringUtils.randomNumeric(5);
+ String data = "data" + RandomStringUtils.randomNumeric(5);
+ OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
+ data.length(), ReplicationType.STAND_ALONE,
+ ReplicationFactor.ONE, new HashMap<>());
+ ozoneOutputStream.write(data.getBytes(), 0, data.length());
+ ozoneOutputStream.close();
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b8da717..7a87b53 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRe
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.util.PersistentLongFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client;
@@ -179,6 +180,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys
@@ -233,11 +235,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
private String omId;
- private OMNodeDetails omNodeDetails;
private List<OMNodeDetails> peerNodes;
- private boolean isRatisEnabled;
- private OzoneManagerRatisServer omRatisServer;
- private OzoneManagerRatisClient omRatisClient;
private final OMMetadataManager metadataManager;
private final VolumeManager volumeManager;
private final BucketManager bucketManager;
@@ -266,6 +264,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private volatile boolean isOmRpcServerRunning = false;
private String omComponent;
+ private boolean isRatisEnabled;
+ private OzoneManagerRatisServer omRatisServer;
+ private OzoneManagerRatisClient omRatisClient;
+ private OMNodeDetails omNodeDetails;
+ private final File ratisSnapshotFile;
+ private long snapshotIndex;
+
private KeyProviderCryptoExtension kmsProvider = null;
private static String keyProviderUriKeyName =
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
@@ -306,6 +311,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
startRatisServer();
startRatisClient();
+ this.ratisSnapshotFile = new File(omStorage.getCurrentDir(),
+ OM_RATIS_SNAPSHOT_INDEX);
+ this.snapshotIndex = loadRatisSnapshotIndex();
+
InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
@@ -1307,6 +1316,33 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
+ @VisibleForTesting
+ public long loadRatisSnapshotIndex() {
+ if (ratisSnapshotFile.exists()) {
+ try {
+ return PersistentLongFile.readFile(ratisSnapshotFile, 0);
+ } catch (IOException e) {
+ LOG.error("Unable to read the ratis snapshot index (last applied " +
+ "transaction log index)", e);
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public long saveRatisSnapshot() throws IOException {
+ snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();
+
+ // Flush the OM state to disk
+ getMetadataManager().getStore().flush();
+
+ PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex);
+ LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}",
+ snapshotIndex);
+
+ return snapshotIndex;
+ }
+
/**
* Stop service.
*/
@@ -2103,7 +2139,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
-
@Override
public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
KeyLocation keyLocation) throws IOException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 01979e4..b16f9f2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMNodeDetails;
-import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClientConfigKeys;
@@ -60,7 +59,6 @@ import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
@@ -83,6 +81,7 @@ public final class OzoneManagerRatisServer {
private final RaftPeerId raftPeerId;
private final OzoneManagerServerProtocol ozoneManager;
+ private final OzoneManagerStateMachine omStateMachine;
private final ClientId clientId = ClientId.randomId();
private final ScheduledExecutorService scheduledRoleChecker;
@@ -130,11 +129,13 @@ public final class OzoneManagerRatisServer {
LOG.info("Instantiating OM Ratis server with GroupID: {} and " +
"Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
+ this.omStateMachine = getStateMachine();
+
this.server = RaftServer.newBuilder()
.setServerId(this.raftPeerId)
.setGroup(this.raftGroup)
.setProperties(serverProperties)
- .setStateMachine(getStateMachine(this.raftGroupId))
+ .setStateMachine(omStateMachine)
.build();
// Run a scheduler to check and update the server role on the leader
@@ -156,7 +157,7 @@ public final class OzoneManagerRatisServer {
* Creates an instance of OzoneManagerRatisServer.
*/
public static OzoneManagerRatisServer newOMRatisServer(
- Configuration ozoneConf, OzoneManager om,
+ Configuration ozoneConf, OzoneManagerServerProtocol omProtocol,
OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes)
throws IOException {
@@ -186,7 +187,7 @@ public final class OzoneManagerRatisServer {
raftPeers.add(raftPeer);
}
- return new OzoneManagerRatisServer(ozoneConf, om, omServiceId,
+ return new OzoneManagerRatisServer(ozoneConf, omProtocol, omServiceId,
localRaftPeerId, ratisAddr, raftPeers);
}
@@ -197,7 +198,7 @@ public final class OzoneManagerRatisServer {
/**
* Returns OzoneManager StateMachine.
*/
- private BaseStateMachine getStateMachine(RaftGroupId gid) {
+ private OzoneManagerStateMachine getStateMachine() {
return new OzoneManagerStateMachine(this);
}
@@ -382,10 +383,13 @@ public final class OzoneManagerRatisServer {
this.roleCheckInitialDelayMs = leaderElectionMinTimeout
.toLong(TimeUnit.MILLISECONDS);
- /**
- * TODO: when ratis snapshots are implemented, set snapshot threshold and
- * queue size.
- */
+ long snapshotAutoTriggerThreshold = conf.getLong(
+ OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
+ properties, true);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+ properties, snapshotAutoTriggerThreshold);
return properties;
}
@@ -517,4 +521,8 @@ public final class OzoneManagerRatisServer {
private UUID getRaftGroupIdFromOmServiceId(String omServiceId) {
return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8));
}
+
+ public long getStateMachineLastAppliedIndex() {
+ return omStateMachine.getLastAppliedIndex();
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 420ffb5..2f3445a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -69,6 +69,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
private final OzoneManagerServerProtocol ozoneManager;
private RequestHandler handler;
private RaftGroupId raftGroupId;
+ private long lastAppliedIndex = 0;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
@@ -95,6 +96,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
* should be rejected.
* @throws IOException thrown by the state machine while validating
*/
+ @Override
public TransactionContext startTransaction(
RaftClientRequest raftClientRequest) throws IOException {
ByteString messageContent = raftClientRequest.getMessage().getContent();
@@ -115,7 +117,63 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
return ctxt;
}
return handleStartTransactionRequests(raftClientRequest, omRequest);
+ }
+ /*
+ * Apply a committed log entry to the state machine.
+ */
+ @Override
+ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+ try {
+ OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
+ trx.getStateMachineLogEntry().getLogData());
+ long trxLogIndex = trx.getLogEntry().getIndex();
+ CompletableFuture<Message> future = CompletableFuture
+ .supplyAsync(() -> runCommand(request, trxLogIndex));
+ return future;
+ } catch (IOException e) {
+ return completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Query the state machine. The request must be read-only.
+ */
+ @Override
+ public CompletableFuture<Message> query(Message request) {
+ try {
+ OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
+ request.getContent());
+ return CompletableFuture.completedFuture(queryCommand(omRequest));
+ } catch (IOException e) {
+ return completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index
+ * is the log index corresponding to the last applied transaction on the OM
+ * State Machine.
+ *
+ * @return the last applied index on the state machine which has been
+ * stored in the snapshot file.
+ */
+ @Override
+ public long takeSnapshot() throws IOException {
+ LOG.info("Saving Ratis snapshot on the OM.");
+ if (ozoneManager != null) {
+ return ozoneManager.saveRatisSnapshot();
+ }
+ return 0;
+ }
+
+ /**
+ * Notifies the state machine that the raft peer is no longer leader.
+ */
+ @Override
+ public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
+ throws IOException {
+ omRatisServer.updateServerRole();
}
/**
@@ -142,10 +200,8 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.setLogData(raftClientRequest.getMessage().getContent())
.build();
}
-
}
-
private TransactionContext handleInitiateMultipartUpload(
RaftClientRequest raftClientRequest, OMRequest omRequest) {
@@ -237,7 +293,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.build();
}
-
/**
* Handle AllocateBlock Request, which needs a special handling. This
* request needs to be executed on the leader, where it connects to SCM and
@@ -250,7 +305,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
RaftClientRequest raftClientRequest, OMRequest omRequest) {
OMResponse omResponse = handler.handle(omRequest);
-
// If request is failed, no need to proceed further.
// Setting the exception with omResponse message and code.
@@ -270,7 +324,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
return transactionContext;
}
-
// Get original request
OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest =
omRequest.getAllocateBlockRequest();
@@ -294,7 +347,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(messageContent)
.build();
-
}
/**
@@ -308,56 +360,33 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
STATUS_CODE + omResponse.getStatus());
}
- /*
- * Apply a committed log entry to the state machine.
- */
- @Override
- public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
- try {
- OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
- trx.getStateMachineLogEntry().getLogData());
- CompletableFuture<Message> future = CompletableFuture
- .supplyAsync(() -> runCommand(request));
- return future;
- } catch (IOException e) {
- return completeExceptionally(e);
- }
- }
-
- /**
- * Query the state machine. The request must be read-only.
- */
- @Override
- public CompletableFuture<Message> query(Message request) {
- try {
- OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
- request.getContent());
- return CompletableFuture.completedFuture(runCommand(omRequest));
- } catch (IOException e) {
- return completeExceptionally(e);
- }
- }
-
/**
- * Notifies the state machine that the raft peer is no longer leader.
+ * Submits write request to OM and returns the response Message.
+ * @param request OMRequest
+ * @return response from OM
+ * @throws ServiceException
*/
- @Override
- public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
- throws IOException {
- omRatisServer.updateServerRole();
+ private Message runCommand(OMRequest request, long trxLogIndex) {
+ OMResponse response = handler.handle(request);
+ lastAppliedIndex = trxLogIndex;
+ return OMRatisHelper.convertResponseToMessage(response);
}
/**
- * Submits request to OM and returns the response Message.
+ * Submits read request to OM and returns the response Message.
* @param request OMRequest
* @return response from OM
* @throws ServiceException
*/
- private Message runCommand(OMRequest request) {
+ private Message queryCommand(OMRequest request) {
OMResponse response = handler.handle(request);
return OMRatisHelper.convertResponseToMessage(response);
}
+ public long getLastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org