You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/30 22:58:47 UTC

[12/50] [abbrv] hadoop git commit: HDFS-12356. Unit test for JournalNode sync during Rolling Upgrade. Contributed by Hanisha Koneru.

HDFS-12356. Unit test for JournalNode sync during Rolling Upgrade. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fd66a243
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fd66a243
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fd66a243

Branch: refs/heads/YARN-3926
Commit: fd66a243bfffc8260bfd69058625d4d9509cafe6
Parents: d04f85f
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Aug 30 10:29:42 2017 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Aug 30 10:29:42 2017 -0700

----------------------------------------------------------------------
 .../qjournal/server/TestJournalNodeSync.java    | 176 +++++++++++++++----
 1 file changed, 137 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd66a243/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
index 2964f05..09ef3a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs.qjournal.server;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -54,10 +56,11 @@ public class TestJournalNodeSync {
   private MiniQJMHACluster qjmhaCluster;
   private MiniDFSCluster dfsCluster;
   private MiniJournalCluster jCluster;
-  private FileSystem fs;
   private FSNamesystem namesystem;
   private int editsPerformed = 0;
   private final String jid = "ns1";
+  private int activeNNindex=0;
+  private static final int DFS_HA_TAILEDITS_PERIOD_SECONDS=1;
 
   @Rule
   public TestName testName = new TestName();
@@ -71,13 +74,16 @@ public class TestJournalNodeSync {
         "testSyncAfterJNdowntimeWithoutQJournalQueue")) {
       conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0);
     }
+    if (testName.getMethodName().equals("testSyncDuringRollingUpgrade")) {
+      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
+          DFS_HA_TAILEDITS_PERIOD_SECONDS);
+    }
     qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
       .build();
     dfsCluster = qjmhaCluster.getDfsCluster();
     jCluster = qjmhaCluster.getJournalCluster();
 
     dfsCluster.transitionToActive(0);
-    fs = dfsCluster.getFileSystem(0);
     namesystem = dfsCluster.getNamesystem(0);
   }
 
@@ -192,36 +198,7 @@ public class TestJournalNodeSync {
   // the journals.
   @Test(timeout=60000)
   public void testRandomJournalMissingLogs() throws Exception {
-    Random randomJournal = new Random();
-
-    List<File> journalCurrentDirs = Lists.newArrayList();
-
-    for (int i = 0; i < 3; i++) {
-      journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
-          jid)).getCurrentDir());
-    }
-
-    int count = 0;
-    long lastStartTxId;
-    int journalIndex;
-    List<File> missingLogs = Lists.newArrayList();
-    while (count < 5) {
-      lastStartTxId = generateEditLog();
-
-      // Delete the last edit log segment from randomly selected journal node
-      journalIndex = randomJournal.nextInt(3);
-      missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
-          lastStartTxId));
-
-      // Delete the last edit log segment from two journals for some logs
-      if (count % 2 == 0) {
-        journalIndex = (journalIndex + 1) % 3;
-        missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
-            lastStartTxId));
-      }
-
-      count++;
-    }
+    List<File> missingLogs = deleteEditLogsFromRandomJN();
 
     GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
   }
@@ -277,7 +254,8 @@ public class TestJournalNodeSync {
    */
   @Test (timeout=300_000)
   public void testSyncAfterJNdowntimeWithoutQJournalQueue() throws Exception{
-    // Queuing is disabled during the cluster setup {@link #setUpMiniCluster()}
+    // QJournal Queuing is disabled during the cluster setup
+    // {@link #setUpMiniCluster()}
     File firstJournalDir = jCluster.getJournalDir(0, jid);
     File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
         .getCurrentDir();
@@ -376,11 +354,88 @@ public class TestJournalNodeSync {
     GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
   }
 
+  // Test JournalNode Sync during a Rolling Upgrade of NN.
+  @Test (timeout=300_000)
+  public void testSyncDuringRollingUpgrade() throws Exception {
+
+    DistributedFileSystem dfsActive;
+    int standbyNNindex;
+
+    if (dfsCluster.getNameNode(0).isActiveState()) {
+      activeNNindex = 0;
+      standbyNNindex = 1;
+    } else {
+      activeNNindex = 1;
+      standbyNNindex = 0;
+    }
+    dfsActive = dfsCluster.getFileSystem(activeNNindex);
+
+    // Prepare for rolling upgrade
+    final RollingUpgradeInfo info = dfsActive.rollingUpgrade(
+          HdfsConstants.RollingUpgradeAction.PREPARE);
+
+    //query rolling upgrade
+    Assert.assertEquals(info, dfsActive.rollingUpgrade(
+        HdfsConstants.RollingUpgradeAction.QUERY));
+
+    // Restart the Standby NN with rollingUpgrade option
+    dfsCluster.restartNameNode(standbyNNindex, true,
+        "-rollingUpgrade", "started");
+    Assert.assertEquals(info, dfsActive.rollingUpgrade(
+        HdfsConstants.RollingUpgradeAction.QUERY));
+
+    // Do some edits and delete some edit logs
+    List<File> missingLogs = deleteEditLogsFromRandomJN();
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+
+    // Transition the active NN to standby and standby to active
+    dfsCluster.transitionToStandby(activeNNindex);
+
+    // Let Standby NN catch up tailing edit logs before transitioning it to
+    // active
+    Thread.sleep(30*DFS_HA_TAILEDITS_PERIOD_SECONDS*1000);
+
+    dfsCluster.transitionToActive(standbyNNindex);
+    dfsCluster.waitActive();
+
+    activeNNindex=standbyNNindex;
+    standbyNNindex=((activeNNindex+1)%2);
+    dfsActive = dfsCluster.getFileSystem(activeNNindex);
+
+    Assert.assertTrue(dfsCluster.getNameNode(activeNNindex).isActiveState());
+    Assert.assertFalse(dfsCluster.getNameNode(standbyNNindex).isActiveState());
+
+    // Restart the current standby NN (previously active)
+    dfsCluster.restartNameNode(standbyNNindex, true,
+        "-rollingUpgrade", "started");
+    Assert.assertEquals(info, dfsActive.rollingUpgrade(
+        HdfsConstants.RollingUpgradeAction.QUERY));
+    dfsCluster.waitActive();
+
+    // Do some edits and delete some edit logs
+    missingLogs.addAll(deleteEditLogsFromRandomJN());
+
+    // Check that JNSync downloaded the edit logs rolled during rolling upgrade.
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+
+    //finalize rolling upgrade
+    final RollingUpgradeInfo finalize = dfsActive.rollingUpgrade(
+        HdfsConstants.RollingUpgradeAction.FINALIZE);
+    Assert.assertTrue(finalize.isFinalized());
+
+    // Check the missing edit logs exist after finalizing rolling upgrade
+    for (File editLog : missingLogs) {
+      Assert.assertTrue("Edit log missing after finalizing rolling upgrade",
+          editLog.exists());
+    }
+  }
+
   private File deleteEditLog(File currentDir, long startTxId)
       throws IOException {
     EditLogFile logFile = getLogFile(currentDir, startTxId);
     while (logFile.isInProgress()) {
-      dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+      dfsCluster.getNameNode(activeNNindex).getRpcServer().rollEditLog();
       logFile = getLogFile(currentDir, startTxId);
     }
     File deleteFile = logFile.getFile();
@@ -389,13 +444,55 @@ public class TestJournalNodeSync {
     return deleteFile;
   }
 
+  private List<File> deleteEditLogsFromRandomJN() throws IOException {
+    Random random = new Random();
+
+    List<File> journalCurrentDirs = Lists.newArrayList();
+
+    for (int i = 0; i < 3; i++) {
+      journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
+          jid)).getCurrentDir());
+    }
+
+    long[] startTxIds = new long[20];
+    for (int i = 0; i < 20; i++) {
+      startTxIds[i] = generateEditLog();
+    }
+
+    int count = 0, startTxIdIndex;
+    long startTxId;
+    int journalIndex;
+    List<File> missingLogs = Lists.newArrayList();
+    List<Integer> deletedStartTxIds = Lists.newArrayList();
+    while (count < 5) {
+      // Select a random edit log to delete
+      startTxIdIndex = random.nextInt(20);
+      while (deletedStartTxIds.contains(startTxIdIndex)) {
+        startTxIdIndex = random.nextInt(20);
+      }
+      startTxId = startTxIds[startTxIdIndex];
+      deletedStartTxIds.add(startTxIdIndex);
+
+      // Delete the randomly selected edit log segment from randomly selected
+      // journal node
+      journalIndex = random.nextInt(3);
+      missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+          startTxId));
+
+      count++;
+    }
+
+    return missingLogs;
+  }
+
   /**
    * Do a mutative metadata operation on the file system.
    *
    * @return true if the operation was successful, false otherwise.
    */
   private boolean doAnEdit() throws IOException {
-    return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
+    return dfsCluster.getFileSystem(activeNNindex).mkdirs(
+        new Path("/tmp", Integer.toString(editsPerformed++)));
   }
 
   /**
@@ -414,12 +511,13 @@ public class TestJournalNodeSync {
    * @return the startTxId of next segment after rolling edits.
    */
   private long generateEditLog(int numEdits) throws IOException {
-    long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
+    long lastWrittenTxId = dfsCluster.getNameNode(activeNNindex).getFSImage()
+        .getEditLog().getLastWrittenTxId();
     for (int i = 1; i <= numEdits; i++) {
       Assert.assertTrue("Failed to do an edit", doAnEdit());
     }
-    dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
-    return startTxId;
+    dfsCluster.getNameNode(activeNNindex).getRpcServer().rollEditLog();
+    return lastWrittenTxId;
   }
 
   private Supplier<Boolean> editLogExists(List<File> editLogs) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org