You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by av...@apache.org on 2020/12/06 05:39:40 UTC
[ozone] branch HDDS-3698-upgrade updated: HDDS-4480. Implement OM
Prepare Request/Response. (#1613)
This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3698-upgrade by this push:
new a7797aa HDDS-4480. Implement OM Prepare Request/Response. (#1613)
a7797aa is described below
commit a7797aacea8925b4e489f935b4b8344561b10439
Author: Ethan Rose <33...@users.noreply.github.com>
AuthorDate: Sun Dec 6 00:39:24 2020 -0500
HDDS-4480. Implement OM Prepare Request/Response. (#1613)
---
.../hadoop/hdds/ratis/RatisUpgradeUtils.java | 96 ------
.../hadoop/hdds/ratis/TestRatisUpgradeUtils.java | 97 ------
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 1 +
.../hadoop/ozone/om/exceptions/OMException.java | 3 +-
.../hadoop/ozone/om/TestOzoneManagerPrepare.java | 326 +++++++++++++++++++++
.../src/main/proto/OmClientProtocol.proto | 12 +
.../apache/hadoop/ozone/om/OMStarterInterface.java | 2 -
.../org/apache/hadoop/ozone/om/OzoneManager.java | 55 +---
.../hadoop/ozone/om/OzoneManagerStarter.java | 65 ++--
.../hadoop/ozone/om/codec/OMDBDefinition.java | 11 +-
.../ozone/om/request/upgrade/OMPrepareRequest.java | 218 ++++++++++++++
.../om/response/upgrade/OMPrepareResponse.java | 44 +++
.../hadoop/ozone/om/TestOmMetadataManager.java | 9 +-
.../om/ratis/TestOzoneManagerStateMachine.java | 4 +-
.../ozone/om/response/TestCleanupTableInfo.java | 5 +-
15 files changed, 650 insertions(+), 298 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisUpgradeUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisUpgradeUtils.java
deleted file mode 100644
index 796668d..0000000
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisUpgradeUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.ratis;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.statemachine.StateMachine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Ratis utility functions.
- */
-public final class RatisUpgradeUtils {
-
- private RatisUpgradeUtils() {
- }
-
- private static final Logger LOG =
- LoggerFactory.getLogger(RatisUpgradeUtils.class);
-
- /**
- * Flush all committed transactions in a given Raft Server for a given group.
- * @param stateMachine state machine to use
- * @param raftGroup raft group
- * @param server Raft server proxy instance.
- * @param maxTimeToWaitSeconds Max time to wait before declaring failure.
- * @throws InterruptedException when interrupted
- * @throws IOException on error while waiting
- */
- public static void waitForAllTxnsApplied(
- StateMachine stateMachine,
- RaftGroup raftGroup,
- RaftServerProxy server,
- long maxTimeToWaitSeconds,
- long timeBetweenRetryInSeconds)
- throws InterruptedException, IOException {
-
- long intervalTime = TimeUnit.SECONDS.toMillis(timeBetweenRetryInSeconds);
- long endTime = System.currentTimeMillis() +
- TimeUnit.SECONDS.toMillis(maxTimeToWaitSeconds);
- boolean success = false;
- while (System.currentTimeMillis() < endTime) {
- success = checkIfAllTransactionsApplied(stateMachine, server, raftGroup);
- if (success) {
- break;
- }
- Thread.sleep(intervalTime);
- }
-
- if (!success) {
- throw new IOException(String.format("After waiting for %d seconds, " +
- "State Machine has not applied all the transactions.",
- maxTimeToWaitSeconds));
- }
-
- long snapshotIndex = stateMachine.takeSnapshot();
- if (snapshotIndex != stateMachine.getLastAppliedTermIndex().getIndex()) {
- throw new IOException("Index from Snapshot does not match last applied " +
- "Index");
- }
- }
-
- private static boolean checkIfAllTransactionsApplied(
- StateMachine stateMachine,
- RaftServerProxy serverProxy,
- RaftGroup raftGroup) throws IOException {
- LOG.info("Checking for pending transactions to be applied.");
- RaftServerImpl impl = serverProxy.getImpl(raftGroup.getGroupId());
- long lastCommittedIndex = impl.getState().getLog().getLastCommittedIndex();
- long appliedIndex = stateMachine.getLastAppliedTermIndex().getIndex();
- LOG.info("lastCommittedIndex = {}, appliedIndex = {}",
- lastCommittedIndex, appliedIndex);
- return (lastCommittedIndex == appliedIndex);
- }
-
-}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestRatisUpgradeUtils.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestRatisUpgradeUtils.java
deleted file mode 100644
index 078bbb5..0000000
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestRatisUpgradeUtils.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.ratis;
-
-import static org.apache.hadoop.hdds.ratis.RatisUpgradeUtils.waitForAllTxnsApplied;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-
-import org.apache.hadoop.test.LambdaTestUtils;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.server.impl.ServerState;
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.statemachine.StateMachine;
-import org.junit.Test;
-
-/**
- * Testing util methods in TestRatisUpgradeUtils.
- */
-public class TestRatisUpgradeUtils {
-
- @Test
- public void testWaitForAllTxnsApplied() throws IOException,
- InterruptedException {
-
- StateMachine stateMachine = mock(StateMachine.class);
- RaftGroup raftGroup = RaftGroup.emptyGroup();
- RaftServerProxy raftServerProxy = mock(RaftServerProxy.class);
- RaftServerImpl raftServer = mock(RaftServerImpl.class);
- ServerState serverState = mock(ServerState.class);
- RaftLog raftLog = mock(RaftLog.class);
-
- when(raftServerProxy.getImpl(
- raftGroup.getGroupId())).thenReturn(raftServer);
- when(raftServer.getState()).thenReturn(serverState);
- when(serverState.getLog()).thenReturn(raftLog);
- when(raftLog.getLastCommittedIndex()).thenReturn(1L);
-
- TermIndex termIndex = mock(TermIndex.class);
- when(termIndex.getIndex()).thenReturn(0L).thenReturn(0L).thenReturn(1L);
- when(stateMachine.getLastAppliedTermIndex()).thenReturn(termIndex);
- when(stateMachine.takeSnapshot()).thenReturn(1L);
-
- waitForAllTxnsApplied(stateMachine, raftGroup, raftServerProxy, 10, 2);
- verify(stateMachine.getLastAppliedTermIndex(),
- times(4)); // 3 checks + 1 after snapshot
- }
-
- @Test
- public void testWaitForAllTxnsAppliedTimeOut() throws Exception {
-
- StateMachine stateMachine = mock(StateMachine.class);
- RaftGroup raftGroup = RaftGroup.emptyGroup();
- RaftServerProxy raftServerProxy = mock(RaftServerProxy.class);
- RaftServerImpl raftServer = mock(RaftServerImpl.class);
- ServerState serverState = mock(ServerState.class);
- RaftLog raftLog = mock(RaftLog.class);
-
- when(raftServerProxy.getImpl(
- raftGroup.getGroupId())).thenReturn(raftServer);
- when(raftServer.getState()).thenReturn(serverState);
- when(serverState.getLog()).thenReturn(raftLog);
- when(raftLog.getLastCommittedIndex()).thenReturn(1L);
-
- TermIndex termIndex = mock(TermIndex.class);
- when(termIndex.getIndex()).thenReturn(0L);
- when(stateMachine.getLastAppliedTermIndex()).thenReturn(termIndex);
- when(stateMachine.takeSnapshot()).thenReturn(1L);
-
- LambdaTestUtils.intercept(IOException.class, "State Machine has not " +
- "applied all the transactions", () ->
- waitForAllTxnsApplied(stateMachine, raftGroup, raftServerProxy,
- 10, 2));
- }
-
-}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 8015fed..fc3e61b 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -290,6 +290,7 @@ public final class OmUtils {
case PurgeKeys:
case RecoverTrash:
case FinalizeUpgrade:
+ case Prepare:
case DeleteOpenKeys:
return false;
default:
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 3c28ae6..c1e8148 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -236,6 +236,7 @@ public class OMException extends IOException {
PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED,
REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED,
UPDATE_LAYOUT_VERSION_FAILED,
- LAYOUT_FEATURE_FINALIZATION_FAILED;
+ LAYOUT_FEATURE_FINALIZATION_FAILED,
+ PREPARE_FAILED
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
new file mode 100644
index 0000000..711ec2a
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareRequest;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test OM prepare against actual mini cluster.
+ */
+public class TestOzoneManagerPrepare extends TestOzoneManagerHA {
+
+ private final String keyPrefix = "key";
+ private final int timeoutMillis = 30000;
+
+ /**
+ * Calls prepare on all OMs when they have no transaction information.
+ * Checks that they are brought into prepare mode successfully.
+ */
+ @Test
+ public void testPrepareWithoutTransactions() throws Exception {
+ MiniOzoneHAClusterImpl cluster = getCluster();
+ OzoneManager leader = cluster.getOMLeader();
+ OMResponse omResponse = submitPrepareRequest(leader.getOmRatisServer());
+ // Get the log index of the prepare request.
+ long prepareRequestLogIndex =
+ omResponse.getPrepareResponse().getTxnID();
+
+ // Prepare response processing is included in the snapshot,
+ // giving index of 1.
+ Assert.assertEquals(1, prepareRequestLogIndex);
+ for (OzoneManager om: cluster.getOzoneManagersList()) {
+ // Leader should be prepared as soon as it returns response.
+ if (om == leader) {
+ checkPrepared(om, prepareRequestLogIndex);
+ } else {
+ waitAndCheckPrepared(om, prepareRequestLogIndex);
+ }
+ }
+ }
+
+ /**
+ * Writes data to the cluster via the leader OM, and then prepares it.
+ * Checks that every OM is prepared successfully.
+ */
+ @Test
+ public void testPrepareWithTransactions() throws Exception {
+ MiniOzoneHAClusterImpl cluster = getCluster();
+ OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf());
+
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ ObjectStore store = ozClient.getObjectStore();
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+
+ Set<String> writtenKeys = new HashSet<>();
+ for (int i = 1; i <= 10; i++) {
+ String keyName = keyPrefix + i;
+ writeTestData(store, volumeName, bucketName, keyName);
+ writtenKeys.add(keyName);
+ }
+
+ // Make sure all OMs have logs from writing data, so we can check that
+ // they are purged after prepare.
+ for (OzoneManager om: cluster.getOzoneManagersList()) {
+ LambdaTestUtils.await(timeoutMillis, 1000,
+ () -> logFilesPresentInRatisPeer(om));
+ }
+
+ OzoneManager leader = cluster.getOMLeader();
+ OMResponse omResponse = submitPrepareRequest(leader.getOmRatisServer());
+ // Get the log index of the prepare request.
+ long prepareRequestLogIndex =
+ omResponse.getPrepareResponse().getTxnID();
+
+ // Make sure all OMs are prepared and all OMs still have their data.
+ for (OzoneManager om: cluster.getOzoneManagersList()) {
+ // Leader should be prepared as soon as it returns response.
+ if (om == leader) {
+ checkPrepared(om, prepareRequestLogIndex);
+ } else {
+ waitAndCheckPrepared(om, prepareRequestLogIndex);
+ }
+
+ List<OmKeyInfo> keys = om.getMetadataManager().listKeys(volumeName,
+ bucketName, null, keyPrefix, 100);
+
+ Assert.assertEquals(writtenKeys.size(), keys.size());
+ for (OmKeyInfo keyInfo: keys) {
+ Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName()));
+ }
+ }
+ }
+
+ /**
+ * Writes data to the cluster.
+ * Shuts down one OM.
+ * Writes more data to the cluster.
+ * Submits prepare as ratis request.
+ * Checks that two live OMs are prepared.
+ * Revives the third OM
+ * Checks that third OM received all transactions and is prepared.
+ * @throws Exception
+ */
+ // TODO: Fix this test so it passes.
+ // @Test
+ public void testPrepareDownedOM() throws Exception {
+ // Index of the OM that will be shut down during this test.
+ final int shutdownOMIndex = 2;
+
+ MiniOzoneHAClusterImpl cluster = getCluster();
+ OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf());
+
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+ ObjectStore store = ozClient.getObjectStore();
+
+ // Create keys with all 3 OMs up.
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+
+ Set<String> writtenKeys = new HashSet<>();
+ for (int i = 1; i <= 50; i++) {
+ String keyName = keyPrefix + i;
+ writeTestData(store, volumeName, bucketName, keyName);
+ writtenKeys.add(keyName);
+ }
+
+ // Make sure all OMs have logs from writing data, so we can check that
+ // they are purged after prepare.
+ for (OzoneManager om: cluster.getOzoneManagersList()) {
+ LambdaTestUtils.await(timeoutMillis, 1000,
+ () -> logFilesPresentInRatisPeer(om));
+ }
+
+ // Shut down one OM.
+ cluster.stopOzoneManager(shutdownOMIndex);
+ OzoneManager downedOM = cluster.getOzoneManager(shutdownOMIndex);
+ Assert.assertFalse(downedOM.isRunning());
+
+ // Write keys with the remaining OMs up.
+ for (int i = 51; i <= 100; i++) {
+ String keyName = keyPrefix + i;
+ writeTestData(store, volumeName, bucketName, keyName);
+ writtenKeys.add(keyName);
+ }
+
+ // Submit prepare request via Ratis.
+ OzoneManager leaderOM = cluster.getOMLeader();
+ long prepareIndex = submitPrepareRequest(leaderOM.getOmRatisServer())
+ .getPrepareResponse()
+ .getTxnID();
+
+ // Check that the two live OMs are prepared.
+ for (OzoneManager om: cluster.getOzoneManagersList()) {
+ if (om == leaderOM) {
+ // Leader should have been prepared after we got the response.
+ checkPrepared(om, prepareIndex);
+ } else if (om != downedOM) {
+ // Follower may still be applying transactions.
+ waitAndCheckPrepared(om, prepareIndex);
+ }
+ }
+
+ // Restart the downed OM and wait for it to catch up.
+ // Since prepare was the last Ratis transaction, it should have all data
+ // it missed once it receives the prepare transaction.
+ cluster.restartOzoneManager(downedOM, true);
+ // Wait for other OMs to catch this one up on transactions.
+ LambdaTestUtils.await(timeoutMillis, 1000,
+ () -> downedOM.getRatisSnapshotIndex() == prepareIndex);
+ checkPrepared(downedOM, prepareIndex);
+
+ // Make sure all OMs are prepared and still have data.
+ for (OzoneManager om: cluster.getOzoneManagersList()) {
+ waitAndCheckPrepared(om, prepareIndex);
+
+ List<OmKeyInfo> readKeys = om.getMetadataManager().listKeys(volumeName,
+ bucketName, null, keyPrefix, 100);
+
+ Assert.assertEquals(writtenKeys.size(), readKeys.size());
+ for (OmKeyInfo keyInfo: readKeys) {
+ Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName()));
+ }
+ }
+ }
+
+ private boolean logFilesPresentInRatisPeer(OzoneManager om) {
+ String ratisDir = om.getOmRatisServer().getServer().getProperties()
+ .get("raft.server.storage.dir");
+ String groupIdDirName =
+ om.getOmRatisServer().getServer().getGroupIds().iterator()
+ .next().getUuid().toString();
+ File logDir = Paths.get(ratisDir, groupIdDirName, "current")
+ .toFile();
+
+ for (File file : logDir.listFiles()) {
+ if (file.getName().startsWith("log")) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void writeTestData(ObjectStore store, String volumeName,
+ String bucketName, String keyName)
+ throws Exception {
+ String keyString = UUID.randomUUID().toString();
+ byte[] data = ContainerTestHelper.getFixedLengthString(
+ keyString, 100).getBytes(UTF_8);
+ OzoneOutputStream keyStream = TestHelper.createKey(
+ keyName, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+ 100, store, volumeName, bucketName);
+ keyStream.write(data);
+ keyStream.close();
+ }
+
+ private OMRequest buildPrepareRequest() {
+ PrepareRequest requestProto = PrepareRequest.newBuilder().build();
+
+ return OMRequest.newBuilder()
+ .setPrepareRequest(requestProto)
+ .setCmdType(Type.Prepare)
+ .setClientId(UUID.randomUUID().toString())
+ .build();
+ }
+
+ private void waitAndCheckPrepared(OzoneManager om,
+ long prepareRequestLogIndex) throws Exception {
+ // Log files are deleted after the snapshot is taken,
+ // So once log files have been deleted, OM should be prepared.
+ LambdaTestUtils.await(timeoutMillis, 1000,
+ () -> !logFilesPresentInRatisPeer(om));
+ checkPrepared(om, prepareRequestLogIndex);
+ }
+
+ private void checkPrepared(OzoneManager om, long prepareRequestLogIndex)
+ throws Exception {
+ Assert.assertFalse(logFilesPresentInRatisPeer(om));
+
+ // If no transactions have been persisted to the DB, transaction info
+ // will be null, not zero.
+ // This will cause a null pointer exception if we use
+ // OzoneManager#getRatisSnapshotIndex to get the index from the DB.
+ OMTransactionInfo txnInfo = om.getMetadataManager()
+ .getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+ if (prepareRequestLogIndex == 0) {
+ Assert.assertNull(txnInfo);
+ } else {
+ Assert.assertEquals(txnInfo.getTransactionIndex(),
+ prepareRequestLogIndex);
+ }
+ }
+
+ private OMResponse submitPrepareRequest(OzoneManagerRatisServer server)
+ throws Exception {
+ PrepareRequest requestProto = PrepareRequest.newBuilder().build();
+
+ OMRequest omRequest = OMRequest.newBuilder()
+ .setPrepareRequest(requestProto)
+ .setCmdType(Type.Prepare)
+ .setClientId(UUID.randomUUID().toString())
+ .build();
+
+ RaftClientRequest raftClientRequest = new RaftClientRequest(
+ ClientId.randomId(),
+ server.getRaftPeerId(),
+ server.getRaftGroupId(),
+ 0,
+ Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)),
+ RaftClientRequest.writeRequestType(), null);
+
+ return server.submitRequest(omRequest, raftClientRequest);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 72cea50..5a90a75 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -74,6 +74,7 @@ enum Type {
DBUpdates = 53;
FinalizeUpgrade = 54;
FinalizeUpgradeProgress = 55;
+ Prepare = 56;
GetDelegationToken = 61;
RenewDelegationToken = 62;
@@ -145,6 +146,7 @@ message OMRequest {
optional DBUpdatesRequest dbUpdatesRequest = 53;
optional FinalizeUpgradeRequest finalizeUpgradeRequest = 54;
optional FinalizeUpgradeProgressRequest finalizeUpgradeProgressRequest = 55;
+ optional PrepareRequest prepareRequest = 56;
optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
@@ -220,6 +222,7 @@ message OMResponse {
optional DBUpdatesResponse dbUpdatesResponse = 52;
optional FinalizeUpgradeResponse finalizeUpgradeResponse = 54;
optional FinalizeUpgradeProgressResponse finalizeUpgradeProgressResponse = 55;
+ optional PrepareResponse prepareResponse = 56;
optional GetDelegationTokenResponseProto getDelegationTokenResponse = 61;
optional RenewDelegationTokenResponseProto renewDelegationTokenResponse = 62;
@@ -329,6 +332,7 @@ enum Status {
REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED = 68;
UPDATE_LAYOUT_VERSION_FAILED = 69;
LAYOUT_FEATURE_FINALIZATION_FAILED = 70;
+ PREPARE_FAILED = 71;
}
/**
@@ -1074,6 +1078,14 @@ message FinalizeUpgradeProgressResponse {
required UpgradeFinalizationStatus status = 1;
}
+message PrepareRequest {
+
+}
+
+message PrepareResponse {
+ required uint64 txnID = 1;
+}
+
message ServicePort {
enum Type {
RPC = 1;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
index 14252a7..f632ad1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
@@ -30,6 +30,4 @@ public interface OMStarterInterface {
AuthenticationException;
boolean init(OzoneConfiguration conf) throws IOException,
AuthenticationException;
- boolean prepareForUpgrade(OzoneConfiguration conf) throws IOException,
- AuthenticationException;
}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index bee4861..e5cee22 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -200,7 +200,6 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
-import static org.apache.hadoop.hdds.ratis.RatisUpgradeUtils.waitForAllTxnsApplied;
import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
@@ -224,12 +223,10 @@ import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS_DEFAULT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FLUSH_TXNS_RETRY_INTERVAL_SECONDS;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MAX_TIME_TO_WAIT_FLUSH_TXNS;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
@@ -246,7 +243,6 @@ import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
-import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
@@ -341,7 +337,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final boolean useRatisForReplication;
private boolean isNativeAuthorizerEnabled;
- private boolean prepareForUpgrade;
+ private boolean isPrepared;
private ExitManager exitManager;
@@ -356,6 +352,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private State omState;
private Thread emptier;
+ // TODO: Utilize the forUpgrade parameter to remove the marker file and
+ // take the OM out of prepare mode on startup.
+ @SuppressWarnings("methodlength")
private OzoneManager(OzoneConfiguration conf, boolean forUpgrade)
throws IOException, AuthenticationException {
super(OzoneVersionInfo.OZONE_VERSION_INFO);
@@ -504,8 +503,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
};
ShutdownHookManager.get().addShutdownHook(shutdownHook,
SHUTDOWN_HOOK_PRIORITY);
- this.prepareForUpgrade = forUpgrade;
omState = State.INITIALIZED;
+
+ // TODO: When marker file is added, check for that on startup to
+ // determine prepare mode.
+ this.isPrepared = false;
}
private void logVersionMismatch(OzoneConfiguration conf, ScmInfo scmInfo) {
@@ -1020,39 +1022,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
- public boolean applyAllPendingTransactions()
- throws InterruptedException, IOException {
-
- if (!isRatisEnabled) {
- LOG.info("Ratis not enabled. Nothing to do.");
- return true;
- }
-
- waitForAllTxnsApplied(omRatisServer.getOmStateMachine(),
- omRatisServer.getRaftGroup(),
- (RaftServerProxy) omRatisServer.getServer(),
- OZONE_OM_MAX_TIME_TO_WAIT_FLUSH_TXNS,
- OZONE_OM_FLUSH_TXNS_RETRY_INTERVAL_SECONDS);
-
- long appliedIndexFromRatis =
- omRatisServer.getOmStateMachine().getLastAppliedTermIndex().getIndex();
- OMTransactionInfo omTransactionInfo =
- OMTransactionInfo.readTransactionInfo(metadataManager);
- long index = omTransactionInfo.getTermIndex().getIndex();
- if (index != appliedIndexFromRatis) {
- throw new IllegalStateException(
- String.format("Cannot prepare OM for Upgrade " +
- "since transaction info table index %d does not match ratis %s",
- index, appliedIndexFromRatis));
- }
-
- LOG.info("OM has been prepared for upgrade. All transactions " +
- "upto {} have been flushed to the state machine, " +
- "and a snapshot has been taken.",
- omRatisServer.getOmStateMachine().getLastAppliedTermIndex().getIndex());
- return true;
- }
-
/**
* Initializes secure OzoneManager.
*/
@@ -1239,10 +1208,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
LOG.error("OM HttpServer failed to start.", ex);
}
- if (!prepareForUpgrade) {
- omRpcServer.start();
- isOmRpcServerRunning = true;
- }
+ omRpcServer.start();
+ isOmRpcServerRunning = true;
// TODO: Start this thread only on the leader node.
// Should be fixed after HDDS-4451.
@@ -1253,7 +1220,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
startJVMPauseMonitor();
setStartTime();
- if (!prepareForUpgrade) {
+ if (!isPrepared) {
omState = State.RUNNING;
} else {
omState = State.PREPARING_FOR_UPGRADE;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
index aac97fa..76e2ced 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import picocli.CommandLine;
import picocli.CommandLine.Command;
@@ -100,28 +99,29 @@ public class OzoneManagerStarter extends GenericCli {
}
}
-
- /**
- * This function implements a sub-command to allow the OM to be
- * "prepared for upgrade".
- */
- @CommandLine.Command(name = "--prepareForUpgrade",
- aliases = {"--prepareForDowngrade", "--flushTransactions"},
- customSynopsis = "ozone om [global options] --prepareForUpgrade",
- hidden = false,
- description = "Prepare the OM for upgrade/downgrade. (Flush Raft log " +
- "transactions.)",
- mixinStandardHelpOptions = true,
- versionProvider = HddsVersionProvider.class)
- @SuppressFBWarnings("DM_EXIT")
- public void prepareOmForUpgrade() throws Exception {
- commonInit();
- boolean result = receiver.prepareForUpgrade(conf);
- if (!result) {
- throw new Exception("Prepare OM For Upgrade failed.");
- }
- System.exit(0);
- }
+ // TODO: Convert this flag to bring the OM out of prepare mode with the new
+ // bits when prepare marker files have been implemented.
+// /**
+// * This function implements a sub-command to allow the OM to be
+// * "prepared for upgrade".
+// */
+// @CommandLine.Command(name = "--prepareForUpgrade",
+// aliases = {"--prepareForDowngrade", "--flushTransactions"},
+// customSynopsis = "ozone om [global options] --prepareForUpgrade",
+// hidden = false,
+// description = "Prepare the OM for upgrade/downgrade. (Flush Raft log " +
+// "transactions.)",
+// mixinStandardHelpOptions = true,
+// versionProvider = HddsVersionProvider.class)
+// @SuppressFBWarnings("DM_EXIT")
+// public void prepareOmForUpgrade() throws Exception {
+// commonInit();
+// boolean result = receiver.prepareForUpgrade(conf);
+// if (!result) {
+// throw new Exception("Prepare OM For Upgrade failed.");
+// }
+// System.exit(0);
+// }
/**
* This function should be called by each command to ensure the configuration
@@ -143,7 +143,7 @@ public class OzoneManagerStarter extends GenericCli {
* testing.
*/
static class OMStarterHelper implements OMStarterInterface{
-
+ @Override
public void start(OzoneConfiguration conf) throws IOException,
AuthenticationException {
OzoneManager om = OzoneManager.createOm(conf);
@@ -151,26 +151,11 @@ public class OzoneManagerStarter extends GenericCli {
om.join();
}
+ @Override
public boolean init(OzoneConfiguration conf) throws IOException,
AuthenticationException {
return OzoneManager.omInit(conf);
}
-
- public boolean prepareForUpgrade(OzoneConfiguration conf)
- throws IOException, AuthenticationException {
- try (OzoneManager om = OzoneManager.createOmUpgradeMode(conf)) {
- om.start();
- boolean success = false;
- try {
- LOG.info("Preparing OM for upgrade.");
- success = om.applyAllPendingTransactions();
- } catch (InterruptedException e) {
- LOG.error("Error preparing OM for upgrade.", e);
- Thread.currentThread().interrupt();
- }
- return success;
- }
- }
}
}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
index 1161fd0..1302922 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
@@ -69,15 +69,6 @@ public class OMDBDefinition implements DBDefinition {
OmVolumeArgs.class,
new OmVolumeArgsCodec());
- public static final DBColumnFamilyDefinition<String, String>
- S3_TABLE=
- new DBColumnFamilyDefinition<>(
- "s3Table",
- String.class,
- new StringCodec(),
- String.class,
- new StringCodec());
-
public static final DBColumnFamilyDefinition<String, OmKeyInfo>
OPEN_KEY_TABLE =
new DBColumnFamilyDefinition<>(
@@ -164,7 +155,7 @@ public class OMDBDefinition implements DBDefinition {
@Override
public DBColumnFamilyDefinition[] getColumnFamilies() {
return new DBColumnFamilyDefinition[] {DELETED_TABLE, USER_TABLE,
- VOLUME_TABLE, S3_TABLE, OPEN_KEY_TABLE, KEY_TABLE,
+ VOLUME_TABLE, OPEN_KEY_TABLE, KEY_TABLE,
BUCKET_TABLE, MULTIPART_INFO_TABLE, PREFIX_TABLE, DTOKEN_TABLE,
S3_SECRET_TABLE, TRANSACTION_INFO_TABLE};
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
new file mode 100644
index 0000000..021d7df
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.upgrade;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * OM Request used to flush all transactions to disk, take a DB snapshot, and
+ * purge the logs, leaving Ratis in a clean state without unapplied log
+ * entries. This prepares the OM for upgrades/downgrades so that no request
+ * in the log is applied to the database in the old version of the code in one
+ * OM, and the new version of the code on another OM.
+ */
+public class OMPrepareRequest extends OMClientRequest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OMPrepareRequest.class);
+
+ // Allow double buffer this many seconds to flush all transactions before
+ // returning an error to the caller.
+ private static final Duration DOUBLE_BUFFER_FLUSH_TIMEOUT =
+ Duration.of(5, ChronoUnit.MINUTES);
+ // Time between checks to see if double buffer finished flushing.
+ private static final Duration DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL =
+ Duration.of(1, ChronoUnit.SECONDS);
+
+ public OMPrepareRequest(OMRequest omRequest) {
+ super(omRequest);
+ }
+
+ @Override
+ public OMClientResponse validateAndUpdateCache(
+ OzoneManager ozoneManager, long transactionLogIndex,
+ OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+ LOG.info("Received prepare request with log index {}", transactionLogIndex);
+
+ OMResponse.Builder responseBuilder =
+ OmResponseUtil.getOMResponseBuilder(getOmRequest());
+ responseBuilder.setCmdType(Type.Prepare);
+ OMClientResponse response = null;
+
+ try {
+ // Create response.
+ PrepareResponse omResponse = PrepareResponse.newBuilder()
+ .setTxnID(transactionLogIndex)
+ .build();
+ responseBuilder.setPrepareResponse(omResponse);
+ response = new OMPrepareResponse(responseBuilder.build());
+
+ // Add response to double buffer before clearing logs.
+ // This guarantees the log index of this request will be the same as
+ // the snapshot index in the prepared state.
+ ozoneManagerDoubleBufferHelper.add(response, transactionLogIndex);
+
+ OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
+ RaftServerProxy server = (RaftServerProxy) omRatisServer.getServer();
+ RaftServerImpl serverImpl =
+ server.getImpl(omRatisServer.getRaftGroup().getGroupId());
+
+ // Wait for outstanding double buffer entries to flush to disk,
+ // so they will not be purged from the log before being persisted to
+ // the DB.
+ // Since the response for this request was added to the double buffer
+ // already, once this index reaches the state machine, we know all
+ // transactions have been flushed.
+ waitForLogIndex(transactionLogIndex,
+ ozoneManager.getMetadataManager(), serverImpl);
+ takeSnapshotAndPurgeLogs(serverImpl);
+
+ // TODO: Create marker file with txn index.
+
+ LOG.info("OM prepared at log index {}. Returning response {}",
+ ozoneManager.getRatisSnapshotIndex(), omResponse);
+ } catch (OMException e) {
+ response = new OMPrepareResponse(
+ createErrorOMResponse(responseBuilder, e));
+ } catch (InterruptedException | IOException e) {
+ // Set error code so that prepare failure does not cause the OM to
+ // terminate.
+ response = new OMPrepareResponse(
+ createErrorOMResponse(responseBuilder, new OMException(e,
+ OMException.ResultCodes.PREPARE_FAILED)));
+ }
+
+ return response;
+ }
+
+ /**
+ * Waits for the specified index to be flushed to the state machine on
+ * disk, and to be updated in memory in Ratis.
+ */
+ private static void waitForLogIndex(long indexToWaitFor,
+ OMMetadataManager metadataManager, RaftServerImpl server)
+ throws InterruptedException, IOException {
+
+ long endTime = System.currentTimeMillis() +
+ DOUBLE_BUFFER_FLUSH_TIMEOUT.toMillis();
+ boolean success = false;
+
+ while (!success && System.currentTimeMillis() < endTime) {
+ // If no transactions have been persisted to the DB, transaction info
+ // will be null, not zero, causing a null pointer exception within
+ // ozoneManager#getRatisSnaphotIndex.
+ // Get the transaction directly instead to handle the case when it is
+ // null.
+ OMTransactionInfo dbTxnInfo = metadataManager
+ .getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+ long ratisTxnIndex =
+ server.getStateMachine().getLastAppliedTermIndex().getIndex();
+
+ // Ratis may apply meta transactions after the prepare request, causing
+ // its in memory index to always be greater than the DB index.
+ if (dbTxnInfo == null) {
+ // If there are no transactions in the DB, we are prepared to log
+ // index 0 only.
+ success = (indexToWaitFor == 0)
+ && (ratisTxnIndex >= indexToWaitFor);
+ } else {
+ success = (dbTxnInfo.getTransactionIndex() == indexToWaitFor)
+ && (ratisTxnIndex >= indexToWaitFor);
+ }
+
+ if (!success) {
+ Thread.sleep(DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL.toMillis());
+ }
+ }
+
+ // If the timeout waiting for all transactions to reach the state machine
+ // is exceeded, the exception is propagated, resulting in an error response
+ // to the client. They can retry the prepare request.
+ if (!success) {
+ throw new IOException(String.format("After waiting for %d seconds, " +
+ "State Machine has not applied all the transactions.",
+ DOUBLE_BUFFER_FLUSH_TIMEOUT.toMillis() * 1000));
+ }
+ }
+
+ /**
+ * Take a snapshot of the state machine at the last index, and purge ALL logs.
+ * @param impl RaftServerImpl instance
+ * @throws IOException on Error.
+ */
+ public static long takeSnapshotAndPurgeLogs(RaftServerImpl impl)
+ throws IOException {
+
+ StateMachine stateMachine = impl.getStateMachine();
+ long snapshotIndex = stateMachine.takeSnapshot();
+ RaftLog raftLog = impl.getState().getLog();
+ long raftLogIndex = raftLog.getLastEntryTermIndex().getIndex();
+
+ // Ensure that Ratis's in memory snapshot index is the same as the index
+ // of its last log entry.
+ if (snapshotIndex != raftLogIndex) {
+ throw new IOException("Snapshot index " + snapshotIndex + " does not " +
+ "match last log index " + raftLogIndex);
+ }
+
+ CompletableFuture<Long> purgeFuture =
+ raftLog.syncWithSnapshot(snapshotIndex);
+ try {
+ Long purgeIndex = purgeFuture.get();
+ if (purgeIndex != snapshotIndex) {
+ throw new IOException("Purge index " + purgeIndex +
+ " does not match last index " + snapshotIndex);
+ }
+ } catch (Exception e) {
+ throw new IOException("Unable to purge logs.", e);
+ }
+
+ return snapshotIndex;
+ }
+
+ public static String getRequestType() {
+ return Type.Prepare.name();
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMPrepareResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMPrepareResponse.java
new file mode 100644
index 0000000..cac9049
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMPrepareResponse.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.upgrade;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+
+import java.io.IOException;
+
+/**
+ * Response for prepare request.
+ */
+@CleanupTableInfo()
+public class OMPrepareResponse extends OMClientResponse {
+ public OMPrepareResponse(
+ OzoneManagerProtocolProtos.OMResponse omResponse) {
+ super(omResponse);
+ }
+
+ @Override
+ protected void addToDBBatch(OMMetadataManager omMetadataManager,
+ BatchOperation batchOperation) throws IOException {
+ // The Prepare request will create a marker file to indicate the OM is
+ // prepared, so there is no DB update for the prepare operation.
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index 71193c9..2364005 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -33,6 +33,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -44,8 +45,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRE
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
/**
* Tests OzoneManager MetadataManager.
@@ -616,10 +615,10 @@ public class TestOmMetadataManager {
@Test
public void testAllTablesAreProperInOMMetadataManagerImpl() {
- String[] tablesByDefinition = OmMetadataManagerImpl.ALL_TABLES;
-
+ Set<String> tablesByDefinition =
+ new HashSet<>(Arrays.asList(OmMetadataManagerImpl.ALL_TABLES));
Set<String> tablesInManager = omMetadataManager.listTableNames();
- assertThat(tablesInManager, containsInAnyOrder(tablesByDefinition));
+ Assert.assertEquals(tablesByDefinition, tablesInManager);
}
}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index 285c992..a0a21b9 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -123,8 +123,8 @@ public class TestOzoneManagerStateMachine {
public void testApplyTransactionsUpdateLastAppliedIndexCalledLate() {
// Now try a scenario where 1,2,3 transactions are in applyTransactionMap
// and updateLastAppliedIndex is not called for them, and before that
- // notifyIndexUpdate is called with transaction 4. And see now at the end
- // when updateLastAppliedIndex is called with epochs we have
+ // notifyTermIndexUpdated is called with transaction 4. And see now at the
+ // end when updateLastAppliedIndex is called with epochs we have
// lastAppliedIndex as 4 or not.
// Conf/metadata transaction.
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
index 90c08cb..99944ce 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest;
import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponse;
import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
+import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
@@ -155,7 +156,9 @@ public class TestCleanupTableInfo {
Assert.assertTrue(
Arrays.stream(cleanupTables).allMatch(tables::contains)
);
- } else {
+ } else if (aClass != OMPrepareResponse.class) {
+ // Prepare response is allowed to have no cleanup tables, since it
+ // does not modify the DB.
Assert.assertTrue(cleanupAll);
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org