You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2020/12/15 23:54:05 UTC

[GitHub] [ozone] errose28 opened a new pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

errose28 opened a new pull request #1705:
URL: https://github.com/apache/ozone/pull/1705


   ## What changes were proposed in this pull request?
   
   Add a gate to the pre append step of OzoneManagerStateMachine and submit request of OzoneManagerRatisServer to block further write requests from coming in when an ozone manager is prepared. Also add a marker file that is written when the OM is prepared so that if it is restarted without a special flag, it automatically enters prepare mode.
   
   This pull request updates the Ratis snapshot to the latest version which includes the necessary RATIS-1216 fix, and addresses backwards incompatible changes made as a result of the update.
   
   ## What is the link to the Apache JIRA
   
   HDDS-4569
   
   ## How was this patch tested?
   
   Unit and integration tests added.
   
   ## Notes
   
   Leaving as draft until some intermittent failures with the OM restart integration test can be addressed, and HDDS-4564 is merged.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] avijayanhwx merged pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
avijayanhwx merged pull request #1705:
URL: https://github.com/apache/ozone/pull/1705


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a change in pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
errose28 commented on a change in pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#discussion_r547527184



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
##########
@@ -138,75 +112,111 @@ public void testPrepareWithTransactions() throws Exception {
    */
 //  @Test
   public void testPrepareDownedOM() throws Exception {
+    setup();
     // Index of the OM that will be shut down during this test.
     final int shutdownOMIndex = 2;
-
-    MiniOzoneHAClusterImpl cluster = getCluster();
-    OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf());
-
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ObjectStore store = ozClient.getObjectStore();
+    List<OzoneManager> runningOms = cluster.getOzoneManagersList();
 
     // Create keys with all 3 OMs up.
-    store.createVolume(volumeName);
-    OzoneVolume volume = store.getVolume(volumeName);
-    volume.createBucket(bucketName);
-
-    Set<String> writtenKeys = new HashSet<>();
-    for (int i = 1; i <= 50; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
-
-    // Make sure all OMs have logs from writing data, so we can check that
-    // they are purged after prepare.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      LambdaTestUtils.await(timeoutMillis, 1000,
-          () -> logFilesPresentInRatisPeer(om));
-    }
+    Set<String> writtenKeys = writeKeysAndWaitForLogs(10, runningOms);
 
     // Shut down one OM.
     cluster.stopOzoneManager(shutdownOMIndex);
     OzoneManager downedOM = cluster.getOzoneManager(shutdownOMIndex);
     Assert.assertFalse(downedOM.isRunning());
+    Assert.assertEquals(runningOms.remove(shutdownOMIndex), downedOM);
 
     // Write keys with the remaining OMs up.
-    for (int i = 51; i <= 100; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
+    writtenKeys.addAll(
+        writeKeysAndWaitForLogs(10, runningOms));
 
-    OzoneManagerProtocol ozoneManagerClient =
-        ozClient.getObjectStore().getClientProxy().getOzoneManagerClient();
-    long prepareIndex = ozoneManagerClient.prepareOzoneManager(
-        PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS, PREPARE_FLUSH_INTERVAL_SECONDS);
+    long prepareIndex = submitPrepareRequest();
 
     // Check that the two live OMs are prepared.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      if (om != downedOM) {
-        // Follower may still be applying transactions.
-        waitAndCheckPrepared(om, prepareIndex);
-      }
-    }
+    assertClusterPrepared(prepareIndex, runningOms);
 
     // Restart the downed OM and wait for it to catch up.
     // Since prepare was the last Ratis transaction, it should have all data
     // it missed once it receives the prepare transaction.
     cluster.restartOzoneManager(downedOM, true);
-    LambdaTestUtils.await(timeoutMillis, 2000,
-        () -> checkPrepared(downedOM, prepareIndex));
+    runningOms.add(shutdownOMIndex, downedOM);
 
     // Make sure all OMs are prepared and still have data.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      List<OmKeyInfo> readKeys = om.getMetadataManager().listKeys(volumeName,
-          bucketName, null, keyPrefix, 100);
+    assertClusterPrepared(prepareIndex, runningOms);
+    assertKeysWritten(writtenKeys, runningOms);
+  }
+
+  @Test
+  public void testPrepareWithRestart() throws Exception {
+    setup();
+    writeKeysAndWaitForLogs(10);
+    long prepareIndex = submitPrepareRequest();
+    assertClusterPrepared(prepareIndex);
+
+    // Restart all ozone managers.
+    cluster.restartOzoneManager();
+
+    // No check for cleared logs, since Ratis meta transactions may slip in
+    // on restart.
+    assertClusterPrepared(prepareIndex);
+  }
 
-      Assert.assertEquals(writtenKeys.size(), readKeys.size());
-      for (OmKeyInfo keyInfo: readKeys) {
-        Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName()));
+  /**
+   * Issues requests on ten different threads, for which one is a prepare and
+   * the rest are create volume. We cannot be sure of the exact order that
+   * the requests will execute, so this test checks that the cluster ends in
+   * a prepared state, and that create volume requests either succeed, or fail
+   * indicating the cluster was prepared before they were encountered.
+   * @throws Exception
+   */
+  @Test
+  public void testPrepareWithMultipleThreads() throws Exception {
+    setup();
+    final int numThreads = 10;
+    final int prepareTaskIndex = 5;
+
+    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+    // For the prepare task, the future will return a log index.
+    // For the create volume tasks, 0 (dummy value) will be returned.
+    List<Future<Long>> tasks = new ArrayList<>();
+
+    for (int i = 0; i < numThreads; i++) {
+      Callable<Long> task;
+      if (i == prepareTaskIndex) {
+        task = this::submitPrepareRequest;
+      } else {
+        String volumeName = VOLUME + i;
+        task = () -> {
+          clientProtocol.createVolume(volumeName);
+          return 0L;
+        };
+      }
+      tasks.add(executorService.submit(task));
+    }
+
+    // For each task, wait for it to complete and check its result.
+    for (int i = 0; i < numThreads; i++) {
+      Future<Long> future = tasks.get(i);
+
+      if (i == prepareTaskIndex) {
+        assertClusterPrepared(future.get());
+        assertRatisLogsCleared();
+      } else {
+        try {
+          // If this throws an exception, it should be an OMException
+          // indicating failure because the cluster was already prepared.
+          // If no exception is thrown, the volume should be created.
+          future.get();
+          String volumeName = VOLUME + i;
+          Assert.assertTrue(clientProtocol.listVolumes(volumeName, "", 1)
+              .stream()
+              .anyMatch((vol) -> vol.getName().equals(volumeName)));
+        } catch (ExecutionException ex) {
+          Throwable cause = ex.getCause();
+          Assert.assertTrue(cause instanceof OMException);
+          Assert.assertEquals(OMException.ResultCodes.NOT_SUPPORTED_OPERATION,
+              ((OMException) cause).getResult());
+        }
       }
     }

Review comment:
       Yes, just fixed in latest revision.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a change in pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
errose28 commented on a change in pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#discussion_r548023762



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerPrepareState.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+/**
+ * Controls the prepare state of the {@link OzoneManager} containing the
+ * instance. When prepared, an ozone manager should have no Ratis logs
+ * remaining, disallow all write requests except prepare and cancel prepare,
+ * and have a marker file present on disk that will cause it to remain prepared
+ * on restart.
+ */
+public final class OzoneManagerPrepareState {
+  public static final long NO_PREPARE_INDEX = -1;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManagerPrepareState.class);
+
+  private boolean prepareGateEnabled;
+  private long prepareIndex;
+  private PrepareStatus status;
+  private final ConfigurationSource conf;
+
+  public OzoneManagerPrepareState(ConfigurationSource conf) {
+    prepareGateEnabled = false;
+    prepareIndex = NO_PREPARE_INDEX;
+    status = PrepareStatus.PREPARE_NOT_STARTED;
+    this.conf = conf;
+  }
+
+  /**
+   * Turns on the prepare gate flag, clears the prepare index, and moves the
+   * prepare status to {@link PrepareStatus#PREPARE_IN_PROGRESS}.
+   *
+   * Turning on the prepare gate flag will enable a gate in the
+   * {@link OzoneManagerStateMachine#preAppendTransaction} (called on leader
+   * OM only) and {@link OzoneManagerRatisServer#submitRequest}
+   * (called on all OMs) that block write requests from reaching the OM and
+   * fail them with error responses to the client.
+   */
+  public synchronized void enablePrepareGate() {
+    prepareGateEnabled = true;
+    prepareIndex = NO_PREPARE_INDEX;
+    status = PrepareStatus.PREPARE_IN_PROGRESS;
+  }
+
+  /**
+   * Removes the prepare marker file, clears the prepare index, turns off
+   * the prepare gate, and moves the prepare status to
+   * {@link PrepareStatus#PREPARE_NOT_STARTED}.
+   * This can be called from any state to clear the current prepare state.
+   *
+   * @throws IOException If the prepare marker file exists but cannot be
+   * deleted.
+   */
+  public synchronized void cancelPrepare()
+      throws IOException {
+    deletePrepareMarkerFile();
+    prepareIndex = NO_PREPARE_INDEX;
+    prepareGateEnabled = false;
+    status = PrepareStatus.PREPARE_NOT_STARTED;
+  }
+
+  /**
+   * Enables the prepare gate, writes the prepare marker file, sets the in
+   * memory prepare index, and
+   * moves the prepare status to {@link PrepareStatus#PREPARE_COMPLETED}.
+   * This can be called from any state to move the OM into prepare mode.
+   *
+   * @param index The log index to prepare the OM on.
+   * @throws IOException If the marker file cannot be written.
+   */
+  public synchronized void finishPrepare(long index) throws IOException {
+    finishPrepare(index, true);
+  }
+
+  private void finishPrepare(long index, boolean writeFile) throws IOException {
+    // Enabling the prepare gate is idempotent, and may have already been
+    // performed if we are the leader.If we are a follower, we must ensure this
+    // is run now case we become the leader.
+    enablePrepareGate();
+
+    if (writeFile) {
+      writePrepareMarkerFile(index);
+    }
+    prepareIndex = index;
+    status = PrepareStatus.PREPARE_COMPLETED;
+  }
+
+  /**
+   * Uses the on disk marker file to determine the OM's prepare state.
+   * If the marker file exists and contains an index matching {@code
+   * expectedPrepareIndex}, the necessary steps will be taken to finish
+   * preparation and the state will be moved to
+   * {@link PrepareStatus#PREPARE_COMPLETED}.
+   * Else, the status will be moved to
+   * {@link PrepareStatus#PREPARE_NOT_STARTED} and any preparation steps will
+   * be cancelled.
+   *
+   * @return The status the OM is in after this method call.
+   * @throws IOException If the marker file cannot be read, and it cannot be
+   * deleted as part of moving to the
+   * {@link PrepareStatus#PREPARE_NOT_STARTED} state.
+   */
+  public synchronized PrepareStatus restorePrepare(long expectedPrepareIndex)
+      throws IOException {
+    boolean prepareIndexRead = true;
+    long prepareMarkerIndex = NO_PREPARE_INDEX;
+
+    File prepareMarkerFile = getPrepareMarkerFile();
+    if (prepareMarkerFile.exists()) {
+      byte[] data = new byte[(int) prepareMarkerFile.length()];
+      try(FileInputStream stream = new FileInputStream(prepareMarkerFile)) {
+        stream.read(data);
+      } catch (IOException e) {
+        LOG.error("Failed to read prepare marker file {} while restoring OM.",
+            prepareMarkerFile.getAbsolutePath());
+        prepareIndexRead = false;
+      }
+
+      try {
+        prepareMarkerIndex = Long.parseLong(
+            new String(data, StandardCharsets.UTF_8));
+      } catch (NumberFormatException e) {
+        LOG.error("Failed to parse log index from prepare marker file {} " +
+            "while restoring OM.", prepareMarkerFile.getAbsolutePath());
+        prepareIndexRead = false;
+      }
+    } else {
+      // No marker file found.
+      prepareIndexRead = false;
+    }
+
+    boolean prepareRestored = false;
+    if (prepareIndexRead) {
+      if (prepareMarkerIndex != expectedPrepareIndex) {
+        LOG.error("Failed to restore OM prepare state, because the expected " +
+            "prepare index {} does not match the index {} written to the " +
+            "marker file.", expectedPrepareIndex, prepareMarkerIndex);
+      } else {
+        // Prepare state can only be restored if we read the expected index
+        // from the marker file.
+        prepareRestored = true;
+      }
+    }
+
+    if (prepareRestored) {
+      enablePrepareGate();

Review comment:
       No, looks like this was left over from an old version, I'll remove it.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
##########
@@ -193,6 +196,32 @@ public TransactionContext startTransaction(
     return handleStartTransactionRequests(raftClientRequest, omRequest);
   }
 
+  @Override
+  public TransactionContext preAppendTransaction(TransactionContext trx)

Review comment:
       Yes, done.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerPrepareState.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+/**
+ * Controls the prepare state of the {@link OzoneManager} containing the
+ * instance. When prepared, an ozone manager should have no Ratis logs
+ * remaining, disallow all write requests except prepare and cancel prepare,
+ * and have a marker file present on disk that will cause it to remain prepared
+ * on restart.
+ */
+public final class OzoneManagerPrepareState {
+  public static final long NO_PREPARE_INDEX = -1;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManagerPrepareState.class);
+
+  private boolean prepareGateEnabled;
+  private long prepareIndex;
+  private PrepareStatus status;
+  private final ConfigurationSource conf;
+
+  public OzoneManagerPrepareState(ConfigurationSource conf) {
+    prepareGateEnabled = false;
+    prepareIndex = NO_PREPARE_INDEX;
+    status = PrepareStatus.PREPARE_NOT_STARTED;
+    this.conf = conf;
+  }
+
+  /**
+   * Turns on the prepare gate flag, clears the prepare index, and moves the
+   * prepare status to {@link PrepareStatus#PREPARE_IN_PROGRESS}.
+   *
+   * Turning on the prepare gate flag will enable a gate in the
+   * {@link OzoneManagerStateMachine#preAppendTransaction} (called on leader
+   * OM only) and {@link OzoneManagerRatisServer#submitRequest}
+   * (called on all OMs) that block write requests from reaching the OM and
+   * fail them with error responses to the client.
+   */
+  public synchronized void enablePrepareGate() {
+    prepareGateEnabled = true;
+    prepareIndex = NO_PREPARE_INDEX;
+    status = PrepareStatus.PREPARE_IN_PROGRESS;
+  }
+
+  /**
+   * Removes the prepare marker file, clears the prepare index, turns off
+   * the prepare gate, and moves the prepare status to
+   * {@link PrepareStatus#PREPARE_NOT_STARTED}.
+   * This can be called from any state to clear the current prepare state.
+   *
+   * @throws IOException If the prepare marker file exists but cannot be
+   * deleted.
+   */
+  public synchronized void cancelPrepare()
+      throws IOException {
+    deletePrepareMarkerFile();
+    prepareIndex = NO_PREPARE_INDEX;
+    prepareGateEnabled = false;
+    status = PrepareStatus.PREPARE_NOT_STARTED;
+  }
+
+  /**
+   * Enables the prepare gate, writes the prepare marker file, sets the in
+   * memory prepare index, and
+   * moves the prepare status to {@link PrepareStatus#PREPARE_COMPLETED}.
+   * This can be called from any state to move the OM into prepare mode.
+   *
+   * @param index The log index to prepare the OM on.
+   * @throws IOException If the marker file cannot be written.
+   */
+  public synchronized void finishPrepare(long index) throws IOException {
+    finishPrepare(index, true);
+  }
+
+  private void finishPrepare(long index, boolean writeFile) throws IOException {
+    // Enabling the prepare gate is idempotent, and may have already been
+    // performed if we are the leader.If we are a follower, we must ensure this
+    // is run now case we become the leader.
+    enablePrepareGate();
+
+    if (writeFile) {
+      writePrepareMarkerFile(index);
+    }
+    prepareIndex = index;
+    status = PrepareStatus.PREPARE_COMPLETED;
+  }
+
+  /**
+   * Uses the on disk marker file to determine the OM's prepare state.
+   * If the marker file exists and contains an index matching {@code
+   * expectedPrepareIndex}, the necessary steps will be taken to finish
+   * preparation and the state will be moved to
+   * {@link PrepareStatus#PREPARE_COMPLETED}.
+   * Else, the status will be moved to
+   * {@link PrepareStatus#PREPARE_NOT_STARTED} and any preparation steps will
+   * be cancelled.
+   *
+   * @return The status the OM is in after this method call.
+   * @throws IOException If the marker file cannot be read, and it cannot be
+   * deleted as part of moving to the
+   * {@link PrepareStatus#PREPARE_NOT_STARTED} state.
+   */
+  public synchronized PrepareStatus restorePrepare(long expectedPrepareIndex)
+      throws IOException {
+    boolean prepareIndexRead = true;
+    long prepareMarkerIndex = NO_PREPARE_INDEX;
+
+    File prepareMarkerFile = getPrepareMarkerFile();
+    if (prepareMarkerFile.exists()) {
+      byte[] data = new byte[(int) prepareMarkerFile.length()];
+      try(FileInputStream stream = new FileInputStream(prepareMarkerFile)) {
+        stream.read(data);
+      } catch (IOException e) {
+        LOG.error("Failed to read prepare marker file {} while restoring OM.",
+            prepareMarkerFile.getAbsolutePath());
+        prepareIndexRead = false;
+      }
+
+      try {
+        prepareMarkerIndex = Long.parseLong(
+            new String(data, StandardCharsets.UTF_8));
+      } catch (NumberFormatException e) {
+        LOG.error("Failed to parse log index from prepare marker file {} " +
+            "while restoring OM.", prepareMarkerFile.getAbsolutePath());
+        prepareIndexRead = false;
+      }
+    } else {
+      // No marker file found.
+      prepareIndexRead = false;
+    }
+
+    boolean prepareRestored = false;
+    if (prepareIndexRead) {
+      if (prepareMarkerIndex != expectedPrepareIndex) {
+        LOG.error("Failed to restore OM prepare state, because the expected " +
+            "prepare index {} does not match the index {} written to the " +
+            "marker file.", expectedPrepareIndex, prepareMarkerIndex);
+      } else {
+        // Prepare state can only be restored if we read the expected index
+        // from the marker file.
+        prepareRestored = true;
+      }
+    }
+
+    if (prepareRestored) {
+      enablePrepareGate();
+      // Do not rewrite the marker file, since we verified it already exists.
+      finishPrepare(prepareMarkerIndex, false);
+    } else {
+      // If the potentially faulty marker file cannot be deleted,
+      // propagate the IOException.
+      // If there is no marker file, this call sets the in memory state only.
+      cancelPrepare();
+    }
+
+    return status;
+  }
+
+  /**
+   * If the prepare gate is enabled, always returns true.
+   * If the prepare gate is disabled, returns true only if {@code
+   * requestType} is {@code Prepare} or {@code CancelPrepare}. Returns false
+   * otherwise.
+   */
+  public synchronized boolean requestAllowed(Type requestType) {
+    boolean requestAllowed = true;
+
+    if (prepareGateEnabled) {
+      // TODO: Also return true for cancel prepare when it is implemented.
+      requestAllowed = (requestType == Type.Prepare);
+    }
+
+    return requestAllowed;
+  }
+
+  /**
+   * @return the current log index and status of preparation.
+   * Both fields are returned together to provide a consistent view of the
+   * state, which would not be guaranteed if they had to be retrieved through
+   * separate getters.
+   */
+  public synchronized State getState() {
+    return new State(prepareIndex, status);
+  }
+
+  /**
+   * Creates a prepare marker file inside {@code metadataDir} which contains
+   * the log index {@code index}. If a marker file already exists, it will be
+   * overwritten.
+   */
+  private void writePrepareMarkerFile(long index) throws IOException {
+    File markerFile = getPrepareMarkerFile();
+    File parentDir = markerFile.getParentFile();
+    if (!parentDir.mkdirs()) {
+      throw new IOException("Failed to create necessary directories in " +
+          "marker file path: " + parentDir);
+    }
+
+    try(FileOutputStream stream = new FileOutputStream(markerFile)) {
+      stream.write(Long.toString(index).getBytes(StandardCharsets.UTF_8));
+    }
+  }
+
+  private void deletePrepareMarkerFile()

Review comment:
       Done

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
##########
@@ -138,77 +112,116 @@ public void testPrepareWithTransactions() throws Exception {
    */
 //  @Test
   public void testPrepareDownedOM() throws Exception {
+    setup();
     // Index of the OM that will be shut down during this test.
     final int shutdownOMIndex = 2;
-
-    MiniOzoneHAClusterImpl cluster = getCluster();
-    OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf());
-
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ObjectStore store = ozClient.getObjectStore();
+    List<OzoneManager> runningOms = cluster.getOzoneManagersList();
 
     // Create keys with all 3 OMs up.
-    store.createVolume(volumeName);
-    OzoneVolume volume = store.getVolume(volumeName);
-    volume.createBucket(bucketName);
-
-    Set<String> writtenKeys = new HashSet<>();
-    for (int i = 1; i <= 50; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
-
-    // Make sure all OMs have logs from writing data, so we can check that
-    // they are purged after prepare.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      LambdaTestUtils.await(timeoutMillis, 1000,
-          () -> logFilesPresentInRatisPeer(om));
-    }
+    Set<String> writtenKeys = writeKeysAndWaitForLogs(10, runningOms);
 
     // Shut down one OM.
     cluster.stopOzoneManager(shutdownOMIndex);
     OzoneManager downedOM = cluster.getOzoneManager(shutdownOMIndex);
     Assert.assertFalse(downedOM.isRunning());
+    Assert.assertEquals(runningOms.remove(shutdownOMIndex), downedOM);
 
     // Write keys with the remaining OMs up.
-    for (int i = 51; i <= 100; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
+    writtenKeys.addAll(
+        writeKeysAndWaitForLogs(10, runningOms));
 
-    OzoneManagerProtocol ozoneManagerClient =
-        ozClient.getObjectStore().getClientProxy().getOzoneManagerClient();
-    long prepareIndex = ozoneManagerClient.prepareOzoneManager(
-        PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS, PREPARE_FLUSH_INTERVAL_SECONDS);
+    long prepareIndex = submitPrepareRequest();
 
     // Check that the two live OMs are prepared.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      if (om != downedOM) {
-        // Follower may still be applying transactions.
-        waitAndCheckPrepared(om, prepareIndex);
-      }
-    }
+    assertClusterPrepared(prepareIndex, runningOms);
 
     // Restart the downed OM and wait for it to catch up.
     // Since prepare was the last Ratis transaction, it should have all data
     // it missed once it receives the prepare transaction.
     cluster.restartOzoneManager(downedOM, true);
-    LambdaTestUtils.await(timeoutMillis, 2000,
-        () -> checkPrepared(downedOM, prepareIndex));
+    runningOms.add(shutdownOMIndex, downedOM);
 
     // Make sure all OMs are prepared and still have data.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      List<OmKeyInfo> readKeys = om.getMetadataManager().listKeys(volumeName,
-          bucketName, null, keyPrefix, 100);
+    assertClusterPrepared(prepareIndex, runningOms);
+    assertKeysWritten(writtenKeys, runningOms);
+  }
+
+  @Test
+  public void testPrepareWithRestart() throws Exception {
+    setup();
+    writeKeysAndWaitForLogs(10);
+    long prepareIndex = submitPrepareRequest();
+    assertClusterPrepared(prepareIndex);
+
+    // Restart all ozone managers.
+    cluster.restartOzoneManager();
+
+    // No check for cleared logs, since Ratis meta transactions may slip in
+    // on restart.
+    assertClusterPrepared(prepareIndex);
+  }
 
-      Assert.assertEquals(writtenKeys.size(), readKeys.size());
-      for (OmKeyInfo keyInfo: readKeys) {
-        Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName()));
+  /**
+   * Issues requests on ten different threads, for which one is a prepare and
+   * the rest are create volume. We cannot be sure of the exact order that
+   * the requests will execute, so this test checks that the cluster ends in
+   * a prepared state, and that create volume requests either succeed, or fail
+   * indicating the cluster was prepared before they were encountered.
+   * @throws Exception
+   */
+  @Test
+  public void testPrepareWithMultipleThreads() throws Exception {
+    setup();
+    final int numThreads = 10;
+    final int prepareTaskIndex = 5;
+
+    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+    // For the prepare task, the future will return a log index.
+    // For the create volume tasks, 0 (dummy value) will be returned.
+    List<Future<Long>> tasks = new ArrayList<>();
+
+    for (int i = 0; i < numThreads; i++) {
+      Callable<Long> task;
+      if (i == prepareTaskIndex) {
+        task = this::submitPrepareRequest;
+      } else {
+        String volumeName = VOLUME + i;
+        task = () -> {
+          clientProtocol.createVolume(volumeName);
+          return 0L;
+        };
       }
+      tasks.add(executorService.submit(task));
     }
+
+    // For each task, wait for it to complete and check its result.
+    for (int i = 0; i < numThreads; i++) {
+      Future<Long> future = tasks.get(i);
+
+      if (i == prepareTaskIndex) {
+        assertClusterPrepared(future.get());
+        assertRatisLogsCleared();
+      } else {
+        try {
+          // If this throws an exception, it should be an OMException
+          // indicating failure because the cluster was already prepared.
+          // If no exception is thrown, the volume should be created.
+          future.get();
+          String volumeName = VOLUME + i;
+          Assert.assertTrue(clientProtocol.listVolumes(volumeName, "", 1)
+              .stream()
+              .anyMatch((vol) -> vol.getName().equals(volumeName)));
+        } catch (ExecutionException ex) {
+          Throwable cause = ex.getCause();
+          Assert.assertTrue(cause instanceof OMException);
+          Assert.assertEquals(OMException.ResultCodes.NOT_SUPPORTED_OPERATION,

Review comment:
       Right, fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a change in pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
errose28 commented on a change in pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#discussion_r547538068



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
##########
@@ -184,26 +188,18 @@ private static void waitForLogIndex(long indexToWaitFor,
 
   /**
    * Take a snapshot of the state machine at the last index, and purge ALL logs.
-   * @param impl RaftServerImpl instance
+   * @param division Raft server division.
    * @throws IOException on Error.
    */
-  public static long takeSnapshotAndPurgeLogs(RaftServerImpl impl)
+  public static long takeSnapshotAndPurgeLogs(RaftServer.Division division)
       throws IOException {
 
-    StateMachine stateMachine = impl.getStateMachine();
+    StateMachine stateMachine = division.getStateMachine();
     long snapshotIndex = stateMachine.takeSnapshot();
-    RaftLog raftLog = impl.getState().getLog();
-    long raftLogIndex = raftLog.getLastEntryTermIndex().getIndex();
-
-    // Ensure that Ratis's in memory snapshot index is the same as the index
-    // of its last log entry.
-    if (snapshotIndex != raftLogIndex) {
-      throw new IOException("Snapshot index " + snapshotIndex + " does not " +
-          "match last log index " + raftLogIndex);
-    }

Review comment:
       It is possible that an extra Ratis transaction to commit the prepare request happens after we take the snapshot. This entry is internal to Ratis so it is not sent to the state machine. This is normal, but with this check in place, it would cause the prepare to fail. HDDS-4610 will wait on this extra transaction and update the DB with its index when the snapshot is taken, which will provide consistent behavior here, instead of the transaction getting included only sometimes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a change in pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
errose28 commented on a change in pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#discussion_r547527668



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
##########
@@ -193,6 +196,32 @@ public TransactionContext startTransaction(
     return handleStartTransactionRequests(raftClientRequest, omRequest);
   }
 
+  public TransactionContext preAppendTransaction(TransactionContext trx)

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] linyiqun commented on a change in pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
linyiqun commented on a change in pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#discussion_r544338947



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
##########
@@ -193,6 +196,32 @@ public TransactionContext startTransaction(
     return handleStartTransactionRequests(raftClientRequest, omRequest);
   }
 
+  public TransactionContext preAppendTransaction(TransactionContext trx)

Review comment:
       Can you add @Override here, so we could directly know this is an override method.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
##########
@@ -138,75 +112,111 @@ public void testPrepareWithTransactions() throws Exception {
    */
 //  @Test
   public void testPrepareDownedOM() throws Exception {
+    setup();
     // Index of the OM that will be shut down during this test.
     final int shutdownOMIndex = 2;
-
-    MiniOzoneHAClusterImpl cluster = getCluster();
-    OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf());
-
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ObjectStore store = ozClient.getObjectStore();
+    List<OzoneManager> runningOms = cluster.getOzoneManagersList();
 
     // Create keys with all 3 OMs up.
-    store.createVolume(volumeName);
-    OzoneVolume volume = store.getVolume(volumeName);
-    volume.createBucket(bucketName);
-
-    Set<String> writtenKeys = new HashSet<>();
-    for (int i = 1; i <= 50; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
-
-    // Make sure all OMs have logs from writing data, so we can check that
-    // they are purged after prepare.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      LambdaTestUtils.await(timeoutMillis, 1000,
-          () -> logFilesPresentInRatisPeer(om));
-    }
+    Set<String> writtenKeys = writeKeysAndWaitForLogs(10, runningOms);
 
     // Shut down one OM.
     cluster.stopOzoneManager(shutdownOMIndex);
     OzoneManager downedOM = cluster.getOzoneManager(shutdownOMIndex);
     Assert.assertFalse(downedOM.isRunning());
+    Assert.assertEquals(runningOms.remove(shutdownOMIndex), downedOM);
 
     // Write keys with the remaining OMs up.
-    for (int i = 51; i <= 100; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
+    writtenKeys.addAll(
+        writeKeysAndWaitForLogs(10, runningOms));
 
-    OzoneManagerProtocol ozoneManagerClient =
-        ozClient.getObjectStore().getClientProxy().getOzoneManagerClient();
-    long prepareIndex = ozoneManagerClient.prepareOzoneManager(
-        PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS, PREPARE_FLUSH_INTERVAL_SECONDS);
+    long prepareIndex = submitPrepareRequest();
 
     // Check that the two live OMs are prepared.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      if (om != downedOM) {
-        // Follower may still be applying transactions.
-        waitAndCheckPrepared(om, prepareIndex);
-      }
-    }
+    assertClusterPrepared(prepareIndex, runningOms);
 
     // Restart the downed OM and wait for it to catch up.
     // Since prepare was the last Ratis transaction, it should have all data
     // it missed once it receives the prepare transaction.
     cluster.restartOzoneManager(downedOM, true);
-    LambdaTestUtils.await(timeoutMillis, 2000,
-        () -> checkPrepared(downedOM, prepareIndex));
+    runningOms.add(shutdownOMIndex, downedOM);
 
     // Make sure all OMs are prepared and still have data.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      List<OmKeyInfo> readKeys = om.getMetadataManager().listKeys(volumeName,
-          bucketName, null, keyPrefix, 100);
+    assertClusterPrepared(prepareIndex, runningOms);
+    assertKeysWritten(writtenKeys, runningOms);
+  }
+
+  @Test
+  public void testPrepareWithRestart() throws Exception {
+    setup();
+    writeKeysAndWaitForLogs(10);
+    long prepareIndex = submitPrepareRequest();
+    assertClusterPrepared(prepareIndex);
+
+    // Restart all ozone managers.
+    cluster.restartOzoneManager();
+
+    // No check for cleared logs, since Ratis meta transactions may slip in
+    // on restart.
+    assertClusterPrepared(prepareIndex);
+  }
 
-      Assert.assertEquals(writtenKeys.size(), readKeys.size());
-      for (OmKeyInfo keyInfo: readKeys) {
-        Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName()));
+  /**
+   * Issues requests on ten different threads, for which one is a prepare and
+   * the rest are create volume. We cannot be sure of the exact order that
+   * the requests will execute, so this test checks that the cluster ends in
+   * a prepared state, and that create volume requests either succeed, or fail
+   * indicating the cluster was prepared before they were encountered.
+   * @throws Exception
+   */
+  @Test
+  public void testPrepareWithMultipleThreads() throws Exception {
+    setup();
+    final int numThreads = 10;
+    final int prepareTaskIndex = 5;
+
+    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+    // For the prepare task, the future will return a log index.
+    // For the create volume tasks, 0 (dummy value) will be returned.
+    List<Future<Long>> tasks = new ArrayList<>();
+
+    for (int i = 0; i < numThreads; i++) {
+      Callable<Long> task;
+      if (i == prepareTaskIndex) {
+        task = this::submitPrepareRequest;
+      } else {
+        String volumeName = VOLUME + i;
+        task = () -> {
+          clientProtocol.createVolume(volumeName);
+          return 0L;
+        };
+      }
+      tasks.add(executorService.submit(task));
+    }
+
+    // For each task, wait for it to complete and check its result.
+    for (int i = 0; i < numThreads; i++) {
+      Future<Long> future = tasks.get(i);
+
+      if (i == prepareTaskIndex) {
+        assertClusterPrepared(future.get());
+        assertRatisLogsCleared();
+      } else {
+        try {
+          // If this throws an exception, it should be an OMException
+          // indicating failure because the cluster was already prepared.
+          // If no exception is thrown, the volume should be created.
+          future.get();
+          String volumeName = VOLUME + i;
+          Assert.assertTrue(clientProtocol.listVolumes(volumeName, "", 1)
+              .stream()
+              .anyMatch((vol) -> vol.getName().equals(volumeName)));
+        } catch (ExecutionException ex) {
+          Throwable cause = ex.getCause();
+          Assert.assertTrue(cause instanceof OMException);
+          Assert.assertEquals(OMException.ResultCodes.NOT_SUPPORTED_OPERATION,
+              ((OMException) cause).getResult());
+        }
       }
     }

Review comment:
       Can you closed executorService after the test.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
##########
@@ -128,10 +129,25 @@ private static long nextCallId() {
    * @throws ServiceException
    */
   public OMResponse submitRequest(OMRequest omRequest) throws ServiceException {
-    RaftClientRequest raftClientRequest =
-        createWriteRaftClientRequest(omRequest);
-    RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
-    return processReply(omRequest, raftClientReply);
+    // In prepare mode, only prepare and cancel requests are allowed to go
+    // through.
+    if (OzoneManagerPrepareState.requestAllowed(omRequest.getCmdType())) {
+      RaftClientRequest raftClientRequest =
+          createWriteRaftClientRequest(omRequest);
+      RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
+
+      return processReply(omRequest, raftClientReply);
+    } else {
+      String message = "Cannot apply write request " +
+          omRequest.getCmdType().name() + " when OM is in prepare mode.";
+      OMResponse.Builder omResponse = OMResponse.newBuilder()
+          .setMessage(message)
+          .setStatus(OzoneManagerProtocolProtos.Status.NOT_SUPPORTED_OPERATION)

Review comment:
       Can we use a better status name for this, like OM_IN_PREPARE_MODE? 

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
##########
@@ -184,26 +188,18 @@ private static void waitForLogIndex(long indexToWaitFor,
 
   /**
    * Take a snapshot of the state machine at the last index, and purge ALL logs.
-   * @param impl RaftServerImpl instance
+   * @param division Raft server division.
    * @throws IOException on Error.
    */
-  public static long takeSnapshotAndPurgeLogs(RaftServerImpl impl)
+  public static long takeSnapshotAndPurgeLogs(RaftServer.Division division)
       throws IOException {
 
-    StateMachine stateMachine = impl.getStateMachine();
+    StateMachine stateMachine = division.getStateMachine();
     long snapshotIndex = stateMachine.takeSnapshot();
-    RaftLog raftLog = impl.getState().getLog();
-    long raftLogIndex = raftLog.getLastEntryTermIndex().getIndex();
-
-    // Ensure that Ratis's in memory snapshot index is the same as the index
-    // of its last log entry.
-    if (snapshotIndex != raftLogIndex) {
-      throw new IOException("Snapshot index " + snapshotIndex + " does not " +
-          "match last log index " + raftLogIndex);
-    }

Review comment:
       Not fully get this, why we remove this safe check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] avijayanhwx commented on pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
avijayanhwx commented on pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#issuecomment-754165733


   Ignoring CI failure since it will be handled in HDDS-4610.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] avijayanhwx commented on a change in pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
avijayanhwx commented on a change in pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#discussion_r547732161



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerPrepareState.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+/**
+ * Controls the prepare state of the {@link OzoneManager} containing the
+ * instance. When prepared, an ozone manager should have no Ratis logs
+ * remaining, disallow all write requests except prepare and cancel prepare,
+ * and have a marker file present on disk that will cause it to remain prepared
+ * on restart.
+ */
+public final class OzoneManagerPrepareState {
+  public static final long NO_PREPARE_INDEX = -1;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManagerPrepareState.class);
+
+  private boolean prepareGateEnabled;
+  private long prepareIndex;
+  private PrepareStatus status;
+  private final ConfigurationSource conf;
+
+  public OzoneManagerPrepareState(ConfigurationSource conf) {
+    prepareGateEnabled = false;
+    prepareIndex = NO_PREPARE_INDEX;
+    status = PrepareStatus.PREPARE_NOT_STARTED;
+    this.conf = conf;
+  }
+
+  /**
+   * Turns on the prepare gate flag, clears the prepare index, and moves the
+   * prepare status to {@link PrepareStatus#PREPARE_IN_PROGRESS}.
+   *
+   * Turning on the prepare gate flag will enable a gate in the
+   * {@link OzoneManagerStateMachine#preAppendTransaction} (called on leader
+   * OM only) and {@link OzoneManagerRatisServer#submitRequest}
+   * (called on all OMs) that block write requests from reaching the OM and
+   * fail them with error responses to the client.
+   */
+  public synchronized void enablePrepareGate() {
+    prepareGateEnabled = true;
+    prepareIndex = NO_PREPARE_INDEX;
+    status = PrepareStatus.PREPARE_IN_PROGRESS;
+  }
+
+  /**
+   * Removes the prepare marker file, clears the prepare index, turns off
+   * the prepare gate, and moves the prepare status to
+   * {@link PrepareStatus#PREPARE_NOT_STARTED}.
+   * This can be called from any state to clear the current prepare state.
+   *
+   * @throws IOException If the prepare marker file exists but cannot be
+   * deleted.
+   */
+  public synchronized void cancelPrepare()
+      throws IOException {
+    deletePrepareMarkerFile();
+    prepareIndex = NO_PREPARE_INDEX;
+    prepareGateEnabled = false;
+    status = PrepareStatus.PREPARE_NOT_STARTED;
+  }
+
+  /**
+   * Enables the prepare gate, writes the prepare marker file, sets the in
+   * memory prepare index, and
+   * moves the prepare status to {@link PrepareStatus#PREPARE_COMPLETED}.
+   * This can be called from any state to move the OM into prepare mode.
+   *
+   * @param index The log index to prepare the OM on.
+   * @throws IOException If the marker file cannot be written.
+   */
+  public synchronized void finishPrepare(long index) throws IOException {
+    finishPrepare(index, true);
+  }
+
+  private void finishPrepare(long index, boolean writeFile) throws IOException {
+    // Enabling the prepare gate is idempotent, and may have already been
+    // performed if we are the leader.If we are a follower, we must ensure this
+    // is run now case we become the leader.
+    enablePrepareGate();
+
+    if (writeFile) {
+      writePrepareMarkerFile(index);
+    }
+    prepareIndex = index;
+    status = PrepareStatus.PREPARE_COMPLETED;
+  }
+
+  /**
+   * Uses the on disk marker file to determine the OM's prepare state.
+   * If the marker file exists and contains an index matching {@code
+   * expectedPrepareIndex}, the necessary steps will be taken to finish
+   * preparation and the state will be moved to
+   * {@link PrepareStatus#PREPARE_COMPLETED}.
+   * Else, the status will be moved to
+   * {@link PrepareStatus#PREPARE_NOT_STARTED} and any preparation steps will
+   * be cancelled.
+   *
+   * @return The status the OM is in after this method call.
+   * @throws IOException If the marker file cannot be read, and it cannot be
+   * deleted as part of moving to the
+   * {@link PrepareStatus#PREPARE_NOT_STARTED} state.
+   */
+  public synchronized PrepareStatus restorePrepare(long expectedPrepareIndex)
+      throws IOException {
+    boolean prepareIndexRead = true;
+    long prepareMarkerIndex = NO_PREPARE_INDEX;
+
+    File prepareMarkerFile = getPrepareMarkerFile();
+    if (prepareMarkerFile.exists()) {
+      byte[] data = new byte[(int) prepareMarkerFile.length()];
+      try(FileInputStream stream = new FileInputStream(prepareMarkerFile)) {
+        stream.read(data);
+      } catch (IOException e) {
+        LOG.error("Failed to read prepare marker file {} while restoring OM.",
+            prepareMarkerFile.getAbsolutePath());
+        prepareIndexRead = false;
+      }
+
+      try {
+        prepareMarkerIndex = Long.parseLong(
+            new String(data, StandardCharsets.UTF_8));
+      } catch (NumberFormatException e) {
+        LOG.error("Failed to parse log index from prepare marker file {} " +
+            "while restoring OM.", prepareMarkerFile.getAbsolutePath());
+        prepareIndexRead = false;
+      }
+    } else {
+      // No marker file found.
+      prepareIndexRead = false;
+    }
+
+    boolean prepareRestored = false;
+    if (prepareIndexRead) {
+      if (prepareMarkerIndex != expectedPrepareIndex) {
+        LOG.error("Failed to restore OM prepare state, because the expected " +
+            "prepare index {} does not match the index {} written to the " +
+            "marker file.", expectedPrepareIndex, prepareMarkerIndex);
+      } else {
+        // Prepare state can only be restored if we read the expected index
+        // from the marker file.
+        prepareRestored = true;
+      }
+    }
+
+    if (prepareRestored) {
+      enablePrepareGate();

Review comment:
       There is an 'enablePrepareGate()' in finishPrepare. Do we need the one in Line # 180? 

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
##########
@@ -193,6 +196,32 @@ public TransactionContext startTransaction(
     return handleStartTransactionRequests(raftClientRequest, omRequest);
   }
 
+  @Override
+  public TransactionContext preAppendTransaction(TransactionContext trx)

Review comment:
       Can we add a unit test for this method since it has a few different execution flows?

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerPrepareState.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+/**
+ * Controls the prepare state of the {@link OzoneManager} containing the
+ * instance. When prepared, an ozone manager should have no Ratis logs
+ * remaining, disallow all write requests except prepare and cancel prepare,
+ * and have a marker file present on disk that will cause it to remain prepared
+ * on restart.
+ */
+public final class OzoneManagerPrepareState {
+  public static final long NO_PREPARE_INDEX = -1;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManagerPrepareState.class);
+
+  private boolean prepareGateEnabled;
+  private long prepareIndex;
+  private PrepareStatus status;
+  private final ConfigurationSource conf;
+
+  public OzoneManagerPrepareState(ConfigurationSource conf) {
+    prepareGateEnabled = false;
+    prepareIndex = NO_PREPARE_INDEX;
+    status = PrepareStatus.PREPARE_NOT_STARTED;
+    this.conf = conf;
+  }
+
+  /**
+   * Turns on the prepare gate flag, clears the prepare index, and moves the
+   * prepare status to {@link PrepareStatus#PREPARE_IN_PROGRESS}.
+   *
+   * Turning on the prepare gate flag will enable a gate in the
+   * {@link OzoneManagerStateMachine#preAppendTransaction} (called on leader
+   * OM only) and {@link OzoneManagerRatisServer#submitRequest}
+   * (called on all OMs) that block write requests from reaching the OM and
+   * fail them with error responses to the client.
+   */
+  public synchronized void enablePrepareGate() {
+    prepareGateEnabled = true;
+    prepareIndex = NO_PREPARE_INDEX;
+    status = PrepareStatus.PREPARE_IN_PROGRESS;
+  }
+
+  /**
+   * Removes the prepare marker file, clears the prepare index, turns off
+   * the prepare gate, and moves the prepare status to
+   * {@link PrepareStatus#PREPARE_NOT_STARTED}.
+   * This can be called from any state to clear the current prepare state.
+   *
+   * @throws IOException If the prepare marker file exists but cannot be
+   * deleted.
+   */
+  public synchronized void cancelPrepare()
+      throws IOException {
+    deletePrepareMarkerFile();
+    prepareIndex = NO_PREPARE_INDEX;
+    prepareGateEnabled = false;
+    status = PrepareStatus.PREPARE_NOT_STARTED;
+  }
+
+  /**
+   * Enables the prepare gate, writes the prepare marker file, sets the in
+   * memory prepare index, and
+   * moves the prepare status to {@link PrepareStatus#PREPARE_COMPLETED}.
+   * This can be called from any state to move the OM into prepare mode.
+   *
+   * @param index The log index to prepare the OM on.
+   * @throws IOException If the marker file cannot be written.
+   */
+  public synchronized void finishPrepare(long index) throws IOException {
+    finishPrepare(index, true);
+  }
+
+  private void finishPrepare(long index, boolean writeFile) throws IOException {
+    // Enabling the prepare gate is idempotent, and may have already been
+    // performed if we are the leader.If we are a follower, we must ensure this
+    // is run now case we become the leader.
+    enablePrepareGate();
+
+    if (writeFile) {
+      writePrepareMarkerFile(index);
+    }
+    prepareIndex = index;
+    status = PrepareStatus.PREPARE_COMPLETED;
+  }
+
+  /**
+   * Uses the on disk marker file to determine the OM's prepare state.
+   * If the marker file exists and contains an index matching {@code
+   * expectedPrepareIndex}, the necessary steps will be taken to finish
+   * preparation and the state will be moved to
+   * {@link PrepareStatus#PREPARE_COMPLETED}.
+   * Else, the status will be moved to
+   * {@link PrepareStatus#PREPARE_NOT_STARTED} and any preparation steps will
+   * be cancelled.
+   *
+   * @return The status the OM is in after this method call.
+   * @throws IOException If the marker file cannot be read, and it cannot be
+   * deleted as part of moving to the
+   * {@link PrepareStatus#PREPARE_NOT_STARTED} state.
+   */
+  public synchronized PrepareStatus restorePrepare(long expectedPrepareIndex)
+      throws IOException {
+    boolean prepareIndexRead = true;
+    long prepareMarkerIndex = NO_PREPARE_INDEX;
+
+    File prepareMarkerFile = getPrepareMarkerFile();
+    if (prepareMarkerFile.exists()) {
+      byte[] data = new byte[(int) prepareMarkerFile.length()];
+      try(FileInputStream stream = new FileInputStream(prepareMarkerFile)) {
+        stream.read(data);
+      } catch (IOException e) {
+        LOG.error("Failed to read prepare marker file {} while restoring OM.",
+            prepareMarkerFile.getAbsolutePath());
+        prepareIndexRead = false;
+      }
+
+      try {
+        prepareMarkerIndex = Long.parseLong(
+            new String(data, StandardCharsets.UTF_8));
+      } catch (NumberFormatException e) {
+        LOG.error("Failed to parse log index from prepare marker file {} " +
+            "while restoring OM.", prepareMarkerFile.getAbsolutePath());
+        prepareIndexRead = false;
+      }
+    } else {
+      // No marker file found.
+      prepareIndexRead = false;
+    }
+
+    boolean prepareRestored = false;
+    if (prepareIndexRead) {
+      if (prepareMarkerIndex != expectedPrepareIndex) {
+        LOG.error("Failed to restore OM prepare state, because the expected " +
+            "prepare index {} does not match the index {} written to the " +
+            "marker file.", expectedPrepareIndex, prepareMarkerIndex);
+      } else {
+        // Prepare state can only be restored if we read the expected index
+        // from the marker file.
+        prepareRestored = true;
+      }
+    }
+
+    if (prepareRestored) {
+      enablePrepareGate();
+      // Do not rewrite the marker file, since we verified it already exists.
+      finishPrepare(prepareMarkerIndex, false);
+    } else {
+      // If the potentially faulty marker file cannot be deleted,
+      // propagate the IOException.
+      // If there is no marker file, this call sets the in memory state only.
+      cancelPrepare();
+    }
+
+    return status;
+  }
+
+  /**
+   * If the prepare gate is enabled, always returns true.
+   * If the prepare gate is disabled, returns true only if {@code
+   * requestType} is {@code Prepare} or {@code CancelPrepare}. Returns false
+   * otherwise.
+   */
+  public synchronized boolean requestAllowed(Type requestType) {
+    boolean requestAllowed = true;
+
+    if (prepareGateEnabled) {
+      // TODO: Also return true for cancel prepare when it is implemented.
+      requestAllowed = (requestType == Type.Prepare);
+    }
+
+    return requestAllowed;
+  }
+
+  /**
+   * @return the current log index and status of preparation.
+   * Both fields are returned together to provide a consistent view of the
+   * state, which would not be guaranteed if they had to be retrieved through
+   * separate getters.
+   */
+  public synchronized State getState() {
+    return new State(prepareIndex, status);
+  }
+
+  /**
+   * Creates a prepare marker file inside {@code metadataDir} which contains
+   * the log index {@code index}. If a marker file already exists, it will be
+   * overwritten.
+   */
+  private void writePrepareMarkerFile(long index) throws IOException {
+    File markerFile = getPrepareMarkerFile();
+    File parentDir = markerFile.getParentFile();
+    if (!parentDir.mkdirs()) {
+      throw new IOException("Failed to create necessary directories in " +
+          "marker file path: " + parentDir);
+    }
+
+    try(FileOutputStream stream = new FileOutputStream(markerFile)) {
+      stream.write(Long.toString(index).getBytes(StandardCharsets.UTF_8));
+    }
+  }
+
+  private void deletePrepareMarkerFile()

Review comment:
       Can we add a log line when we create & delete the marker file? It will help us with troubleshooting issues with preparation.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
##########
@@ -138,77 +112,116 @@ public void testPrepareWithTransactions() throws Exception {
    */
 //  @Test
   public void testPrepareDownedOM() throws Exception {
+    setup();
     // Index of the OM that will be shut down during this test.
     final int shutdownOMIndex = 2;
-
-    MiniOzoneHAClusterImpl cluster = getCluster();
-    OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf());
-
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ObjectStore store = ozClient.getObjectStore();
+    List<OzoneManager> runningOms = cluster.getOzoneManagersList();
 
     // Create keys with all 3 OMs up.
-    store.createVolume(volumeName);
-    OzoneVolume volume = store.getVolume(volumeName);
-    volume.createBucket(bucketName);
-
-    Set<String> writtenKeys = new HashSet<>();
-    for (int i = 1; i <= 50; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
-
-    // Make sure all OMs have logs from writing data, so we can check that
-    // they are purged after prepare.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      LambdaTestUtils.await(timeoutMillis, 1000,
-          () -> logFilesPresentInRatisPeer(om));
-    }
+    Set<String> writtenKeys = writeKeysAndWaitForLogs(10, runningOms);
 
     // Shut down one OM.
     cluster.stopOzoneManager(shutdownOMIndex);
     OzoneManager downedOM = cluster.getOzoneManager(shutdownOMIndex);
     Assert.assertFalse(downedOM.isRunning());
+    Assert.assertEquals(runningOms.remove(shutdownOMIndex), downedOM);
 
     // Write keys with the remaining OMs up.
-    for (int i = 51; i <= 100; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
+    writtenKeys.addAll(
+        writeKeysAndWaitForLogs(10, runningOms));
 
-    OzoneManagerProtocol ozoneManagerClient =
-        ozClient.getObjectStore().getClientProxy().getOzoneManagerClient();
-    long prepareIndex = ozoneManagerClient.prepareOzoneManager(
-        PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS, PREPARE_FLUSH_INTERVAL_SECONDS);
+    long prepareIndex = submitPrepareRequest();
 
     // Check that the two live OMs are prepared.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      if (om != downedOM) {
-        // Follower may still be applying transactions.
-        waitAndCheckPrepared(om, prepareIndex);
-      }
-    }
+    assertClusterPrepared(prepareIndex, runningOms);
 
     // Restart the downed OM and wait for it to catch up.
     // Since prepare was the last Ratis transaction, it should have all data
     // it missed once it receives the prepare transaction.
     cluster.restartOzoneManager(downedOM, true);
-    LambdaTestUtils.await(timeoutMillis, 2000,
-        () -> checkPrepared(downedOM, prepareIndex));
+    runningOms.add(shutdownOMIndex, downedOM);
 
     // Make sure all OMs are prepared and still have data.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      List<OmKeyInfo> readKeys = om.getMetadataManager().listKeys(volumeName,
-          bucketName, null, keyPrefix, 100);
+    assertClusterPrepared(prepareIndex, runningOms);
+    assertKeysWritten(writtenKeys, runningOms);
+  }
+
+  @Test
+  public void testPrepareWithRestart() throws Exception {
+    setup();
+    writeKeysAndWaitForLogs(10);
+    long prepareIndex = submitPrepareRequest();
+    assertClusterPrepared(prepareIndex);
+
+    // Restart all ozone managers.
+    cluster.restartOzoneManager();
+
+    // No check for cleared logs, since Ratis meta transactions may slip in
+    // on restart.
+    assertClusterPrepared(prepareIndex);
+  }
 
-      Assert.assertEquals(writtenKeys.size(), readKeys.size());
-      for (OmKeyInfo keyInfo: readKeys) {
-        Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName()));
+  /**
+   * Issues requests on ten different threads, for which one is a prepare and
+   * the rest are create volume. We cannot be sure of the exact order that
+   * the requests will execute, so this test checks that the cluster ends in
+   * a prepared state, and that create volume requests either succeed, or fail
+   * indicating the cluster was prepared before they were encountered.
+   * @throws Exception
+   */
+  @Test
+  public void testPrepareWithMultipleThreads() throws Exception {
+    setup();
+    final int numThreads = 10;
+    final int prepareTaskIndex = 5;
+
+    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+    // For the prepare task, the future will return a log index.
+    // For the create volume tasks, 0 (dummy value) will be returned.
+    List<Future<Long>> tasks = new ArrayList<>();
+
+    for (int i = 0; i < numThreads; i++) {
+      Callable<Long> task;
+      if (i == prepareTaskIndex) {
+        task = this::submitPrepareRequest;
+      } else {
+        String volumeName = VOLUME + i;
+        task = () -> {
+          clientProtocol.createVolume(volumeName);
+          return 0L;
+        };
       }
+      tasks.add(executorService.submit(task));
     }
+
+    // For each task, wait for it to complete and check its result.
+    for (int i = 0; i < numThreads; i++) {
+      Future<Long> future = tasks.get(i);
+
+      if (i == prepareTaskIndex) {
+        assertClusterPrepared(future.get());
+        assertRatisLogsCleared();
+      } else {
+        try {
+          // If this throws an exception, it should be an OMException
+          // indicating failure because the cluster was already prepared.
+          // If no exception is thrown, the volume should be created.
+          future.get();
+          String volumeName = VOLUME + i;
+          Assert.assertTrue(clientProtocol.listVolumes(volumeName, "", 1)
+              .stream()
+              .anyMatch((vol) -> vol.getName().equals(volumeName)));
+        } catch (ExecutionException ex) {
+          Throwable cause = ex.getCause();
+          Assert.assertTrue(cause instanceof OMException);
+          Assert.assertEquals(OMException.ResultCodes.NOT_SUPPORTED_OPERATION,

Review comment:
       I believe this should be NOT_SUPPORTED_OPERATION_WHEN_PREPARED.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a change in pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
errose28 commented on a change in pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#discussion_r547527463



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
##########
@@ -128,10 +129,25 @@ private static long nextCallId() {
    * @throws ServiceException
    */
   public OMResponse submitRequest(OMRequest omRequest) throws ServiceException {
-    RaftClientRequest raftClientRequest =
-        createWriteRaftClientRequest(omRequest);
-    RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
-    return processReply(omRequest, raftClientReply);
+    // In prepare mode, only prepare and cancel requests are allowed to go
+    // through.
+    if (OzoneManagerPrepareState.requestAllowed(omRequest.getCmdType())) {
+      RaftClientRequest raftClientRequest =
+          createWriteRaftClientRequest(omRequest);
+      RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
+
+      return processReply(omRequest, raftClientReply);
+    } else {
+      String message = "Cannot apply write request " +
+          omRequest.getCmdType().name() + " when OM is in prepare mode.";
+      OMResponse.Builder omResponse = OMResponse.newBuilder()
+          .setMessage(message)
+          .setStatus(OzoneManagerProtocolProtos.Status.NOT_SUPPORTED_OPERATION)

Review comment:
       Yes, I have created a new dedicated status for rejecting requests because the om is in prepare mode. Thanks for the suggestion!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] linyiqun commented on a change in pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

Posted by GitBox <gi...@apache.org>.
linyiqun commented on a change in pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#discussion_r547997929



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
##########
@@ -184,26 +188,18 @@ private static void waitForLogIndex(long indexToWaitFor,
 
   /**
    * Take a snapshot of the state machine at the last index, and purge ALL logs.
-   * @param impl RaftServerImpl instance
+   * @param division Raft server division.
    * @throws IOException on Error.
    */
-  public static long takeSnapshotAndPurgeLogs(RaftServerImpl impl)
+  public static long takeSnapshotAndPurgeLogs(RaftServer.Division division)
       throws IOException {
 
-    StateMachine stateMachine = impl.getStateMachine();
+    StateMachine stateMachine = division.getStateMachine();
     long snapshotIndex = stateMachine.takeSnapshot();
-    RaftLog raftLog = impl.getState().getLog();
-    long raftLogIndex = raftLog.getLastEntryTermIndex().getIndex();
-
-    // Ensure that Ratis's in memory snapshot index is the same as the index
-    // of its last log entry.
-    if (snapshotIndex != raftLogIndex) {
-      throw new IOException("Snapshot index " + snapshotIndex + " does not " +
-          "match last log index " + raftLogIndex);
-    }

Review comment:
       Okay, get it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org