You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2020/12/16 14:27:48 UTC

[GitHub] [ozone] linyiqun commented on a change in pull request #1705: HDDS-4569. Add pre append gate and marker file to OM prepare state.

linyiqun commented on a change in pull request #1705:
URL: https://github.com/apache/ozone/pull/1705#discussion_r544338947



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
##########
@@ -193,6 +196,32 @@ public TransactionContext startTransaction(
     return handleStartTransactionRequests(raftClientRequest, omRequest);
   }
 
+  public TransactionContext preAppendTransaction(TransactionContext trx)

Review comment:
       Can you add @Override here, so we could directly know this is an override method.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
##########
@@ -138,75 +112,111 @@ public void testPrepareWithTransactions() throws Exception {
    */
 //  @Test
   public void testPrepareDownedOM() throws Exception {
+    setup();
     // Index of the OM that will be shut down during this test.
     final int shutdownOMIndex = 2;
-
-    MiniOzoneHAClusterImpl cluster = getCluster();
-    OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf());
-
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
-    ObjectStore store = ozClient.getObjectStore();
+    List<OzoneManager> runningOms = cluster.getOzoneManagersList();
 
     // Create keys with all 3 OMs up.
-    store.createVolume(volumeName);
-    OzoneVolume volume = store.getVolume(volumeName);
-    volume.createBucket(bucketName);
-
-    Set<String> writtenKeys = new HashSet<>();
-    for (int i = 1; i <= 50; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
-
-    // Make sure all OMs have logs from writing data, so we can check that
-    // they are purged after prepare.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      LambdaTestUtils.await(timeoutMillis, 1000,
-          () -> logFilesPresentInRatisPeer(om));
-    }
+    Set<String> writtenKeys = writeKeysAndWaitForLogs(10, runningOms);
 
     // Shut down one OM.
     cluster.stopOzoneManager(shutdownOMIndex);
     OzoneManager downedOM = cluster.getOzoneManager(shutdownOMIndex);
     Assert.assertFalse(downedOM.isRunning());
+    Assert.assertEquals(runningOms.remove(shutdownOMIndex), downedOM);
 
     // Write keys with the remaining OMs up.
-    for (int i = 51; i <= 100; i++) {
-      String keyName = keyPrefix + i;
-      writeTestData(store, volumeName, bucketName, keyName);
-      writtenKeys.add(keyName);
-    }
+    writtenKeys.addAll(
+        writeKeysAndWaitForLogs(10, runningOms));
 
-    OzoneManagerProtocol ozoneManagerClient =
-        ozClient.getObjectStore().getClientProxy().getOzoneManagerClient();
-    long prepareIndex = ozoneManagerClient.prepareOzoneManager(
-        PREPARE_FLUSH_WAIT_TIMEOUT_SECONDS, PREPARE_FLUSH_INTERVAL_SECONDS);
+    long prepareIndex = submitPrepareRequest();
 
     // Check that the two live OMs are prepared.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      if (om != downedOM) {
-        // Follower may still be applying transactions.
-        waitAndCheckPrepared(om, prepareIndex);
-      }
-    }
+    assertClusterPrepared(prepareIndex, runningOms);
 
     // Restart the downed OM and wait for it to catch up.
     // Since prepare was the last Ratis transaction, it should have all data
     // it missed once it receives the prepare transaction.
     cluster.restartOzoneManager(downedOM, true);
-    LambdaTestUtils.await(timeoutMillis, 2000,
-        () -> checkPrepared(downedOM, prepareIndex));
+    runningOms.add(shutdownOMIndex, downedOM);
 
     // Make sure all OMs are prepared and still have data.
-    for (OzoneManager om: cluster.getOzoneManagersList()) {
-      List<OmKeyInfo> readKeys = om.getMetadataManager().listKeys(volumeName,
-          bucketName, null, keyPrefix, 100);
+    assertClusterPrepared(prepareIndex, runningOms);
+    assertKeysWritten(writtenKeys, runningOms);
+  }
+
+  @Test
+  public void testPrepareWithRestart() throws Exception {
+    setup();
+    writeKeysAndWaitForLogs(10);
+    long prepareIndex = submitPrepareRequest();
+    assertClusterPrepared(prepareIndex);
+
+    // Restart all ozone managers.
+    cluster.restartOzoneManager();
+
+    // No check for cleared logs, since Ratis meta transactions may slip in
+    // on restart.
+    assertClusterPrepared(prepareIndex);
+  }
 
-      Assert.assertEquals(writtenKeys.size(), readKeys.size());
-      for (OmKeyInfo keyInfo: readKeys) {
-        Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName()));
+  /**
+   * Issues requests on ten different threads, for which one is a prepare and
+   * the rest are create volume. We cannot be sure of the exact order that
+   * the requests will execute, so this test checks that the cluster ends in
+   * a prepared state, and that create volume requests either succeed, or fail
+   * indicating the cluster was prepared before they were encountered.
+   * @throws Exception
+   */
+  @Test
+  public void testPrepareWithMultipleThreads() throws Exception {
+    setup();
+    final int numThreads = 10;
+    final int prepareTaskIndex = 5;
+
+    ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+    // For the prepare task, the future will return a log index.
+    // For the create volume tasks, 0 (dummy value) will be returned.
+    List<Future<Long>> tasks = new ArrayList<>();
+
+    for (int i = 0; i < numThreads; i++) {
+      Callable<Long> task;
+      if (i == prepareTaskIndex) {
+        task = this::submitPrepareRequest;
+      } else {
+        String volumeName = VOLUME + i;
+        task = () -> {
+          clientProtocol.createVolume(volumeName);
+          return 0L;
+        };
+      }
+      tasks.add(executorService.submit(task));
+    }
+
+    // For each task, wait for it to complete and check its result.
+    for (int i = 0; i < numThreads; i++) {
+      Future<Long> future = tasks.get(i);
+
+      if (i == prepareTaskIndex) {
+        assertClusterPrepared(future.get());
+        assertRatisLogsCleared();
+      } else {
+        try {
+          // If this throws an exception, it should be an OMException
+          // indicating failure because the cluster was already prepared.
+          // If no exception is thrown, the volume should be created.
+          future.get();
+          String volumeName = VOLUME + i;
+          Assert.assertTrue(clientProtocol.listVolumes(volumeName, "", 1)
+              .stream()
+              .anyMatch((vol) -> vol.getName().equals(volumeName)));
+        } catch (ExecutionException ex) {
+          Throwable cause = ex.getCause();
+          Assert.assertTrue(cause instanceof OMException);
+          Assert.assertEquals(OMException.ResultCodes.NOT_SUPPORTED_OPERATION,
+              ((OMException) cause).getResult());
+        }
       }
     }

Review comment:
       Can you closed executorService after the test.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
##########
@@ -128,10 +129,25 @@ private static long nextCallId() {
    * @throws ServiceException
    */
   public OMResponse submitRequest(OMRequest omRequest) throws ServiceException {
-    RaftClientRequest raftClientRequest =
-        createWriteRaftClientRequest(omRequest);
-    RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
-    return processReply(omRequest, raftClientReply);
+    // In prepare mode, only prepare and cancel requests are allowed to go
+    // through.
+    if (OzoneManagerPrepareState.requestAllowed(omRequest.getCmdType())) {
+      RaftClientRequest raftClientRequest =
+          createWriteRaftClientRequest(omRequest);
+      RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
+
+      return processReply(omRequest, raftClientReply);
+    } else {
+      String message = "Cannot apply write request " +
+          omRequest.getCmdType().name() + " when OM is in prepare mode.";
+      OMResponse.Builder omResponse = OMResponse.newBuilder()
+          .setMessage(message)
+          .setStatus(OzoneManagerProtocolProtos.Status.NOT_SUPPORTED_OPERATION)

Review comment:
       Can we use a better status name for this, like OM_IN_PREPARE_MODE? 

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
##########
@@ -184,26 +188,18 @@ private static void waitForLogIndex(long indexToWaitFor,
 
   /**
    * Take a snapshot of the state machine at the last index, and purge ALL logs.
-   * @param impl RaftServerImpl instance
+   * @param division Raft server division.
    * @throws IOException on Error.
    */
-  public static long takeSnapshotAndPurgeLogs(RaftServerImpl impl)
+  public static long takeSnapshotAndPurgeLogs(RaftServer.Division division)
       throws IOException {
 
-    StateMachine stateMachine = impl.getStateMachine();
+    StateMachine stateMachine = division.getStateMachine();
     long snapshotIndex = stateMachine.takeSnapshot();
-    RaftLog raftLog = impl.getState().getLog();
-    long raftLogIndex = raftLog.getLastEntryTermIndex().getIndex();
-
-    // Ensure that Ratis's in memory snapshot index is the same as the index
-    // of its last log entry.
-    if (snapshotIndex != raftLogIndex) {
-      throw new IOException("Snapshot index " + snapshotIndex + " does not " +
-          "match last log index " + raftLogIndex);
-    }

Review comment:
       Not fully get this, why we remove this safe check?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org