You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/30 22:58:47 UTC
[12/50] [abbrv] hadoop git commit: HDFS-12356. Unit test for
JournalNode sync during Rolling Upgrade. Contributed by Hanisha Koneru.
HDFS-12356. Unit test for JournalNode sync during Rolling Upgrade. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fd66a243
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fd66a243
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fd66a243
Branch: refs/heads/YARN-3926
Commit: fd66a243bfffc8260bfd69058625d4d9509cafe6
Parents: d04f85f
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Aug 30 10:29:42 2017 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Aug 30 10:29:42 2017 -0700
----------------------------------------------------------------------
.../qjournal/server/TestJournalNodeSync.java | 176 +++++++++++++++----
1 file changed, 137 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd66a243/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
index 2964f05..09ef3a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs.qjournal.server;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -54,10 +56,11 @@ public class TestJournalNodeSync {
private MiniQJMHACluster qjmhaCluster;
private MiniDFSCluster dfsCluster;
private MiniJournalCluster jCluster;
- private FileSystem fs;
private FSNamesystem namesystem;
private int editsPerformed = 0;
private final String jid = "ns1";
+ private int activeNNindex=0;
+ private static final int DFS_HA_TAILEDITS_PERIOD_SECONDS=1;
@Rule
public TestName testName = new TestName();
@@ -71,13 +74,16 @@ public class TestJournalNodeSync {
"testSyncAfterJNdowntimeWithoutQJournalQueue")) {
conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0);
}
+ if (testName.getMethodName().equals("testSyncDuringRollingUpgrade")) {
+ conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
+ DFS_HA_TAILEDITS_PERIOD_SECONDS);
+ }
qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
.build();
dfsCluster = qjmhaCluster.getDfsCluster();
jCluster = qjmhaCluster.getJournalCluster();
dfsCluster.transitionToActive(0);
- fs = dfsCluster.getFileSystem(0);
namesystem = dfsCluster.getNamesystem(0);
}
@@ -192,36 +198,7 @@ public class TestJournalNodeSync {
// the journals.
@Test(timeout=60000)
public void testRandomJournalMissingLogs() throws Exception {
- Random randomJournal = new Random();
-
- List<File> journalCurrentDirs = Lists.newArrayList();
-
- for (int i = 0; i < 3; i++) {
- journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
- jid)).getCurrentDir());
- }
-
- int count = 0;
- long lastStartTxId;
- int journalIndex;
- List<File> missingLogs = Lists.newArrayList();
- while (count < 5) {
- lastStartTxId = generateEditLog();
-
- // Delete the last edit log segment from randomly selected journal node
- journalIndex = randomJournal.nextInt(3);
- missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
- lastStartTxId));
-
- // Delete the last edit log segment from two journals for some logs
- if (count % 2 == 0) {
- journalIndex = (journalIndex + 1) % 3;
- missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
- lastStartTxId));
- }
-
- count++;
- }
+ List<File> missingLogs = deleteEditLogsFromRandomJN();
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
}
@@ -277,7 +254,8 @@ public class TestJournalNodeSync {
*/
@Test (timeout=300_000)
public void testSyncAfterJNdowntimeWithoutQJournalQueue() throws Exception{
- // Queuing is disabled during the cluster setup {@link #setUpMiniCluster()}
+ // QJournal Queuing is disabled during the cluster setup
+ // {@link #setUpMiniCluster()}
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();
@@ -376,11 +354,88 @@ public class TestJournalNodeSync {
GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
}
+ // Test JournalNode Sync during a Rolling Upgrade of NN.
+ @Test (timeout=300_000)
+ public void testSyncDuringRollingUpgrade() throws Exception {
+
+ DistributedFileSystem dfsActive;
+ int standbyNNindex;
+
+ if (dfsCluster.getNameNode(0).isActiveState()) {
+ activeNNindex = 0;
+ standbyNNindex = 1;
+ } else {
+ activeNNindex = 1;
+ standbyNNindex = 0;
+ }
+ dfsActive = dfsCluster.getFileSystem(activeNNindex);
+
+ // Prepare for rolling upgrade
+ final RollingUpgradeInfo info = dfsActive.rollingUpgrade(
+ HdfsConstants.RollingUpgradeAction.PREPARE);
+
+ //query rolling upgrade
+ Assert.assertEquals(info, dfsActive.rollingUpgrade(
+ HdfsConstants.RollingUpgradeAction.QUERY));
+
+ // Restart the Standby NN with rollingUpgrade option
+ dfsCluster.restartNameNode(standbyNNindex, true,
+ "-rollingUpgrade", "started");
+ Assert.assertEquals(info, dfsActive.rollingUpgrade(
+ HdfsConstants.RollingUpgradeAction.QUERY));
+
+ // Do some edits and delete some edit logs
+ List<File> missingLogs = deleteEditLogsFromRandomJN();
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+
+ // Transition the active NN to standby and standby to active
+ dfsCluster.transitionToStandby(activeNNindex);
+
+ // Let Standby NN catch up tailing edit logs before transitioning it to
+ // active
+ Thread.sleep(30*DFS_HA_TAILEDITS_PERIOD_SECONDS*1000);
+
+ dfsCluster.transitionToActive(standbyNNindex);
+ dfsCluster.waitActive();
+
+ activeNNindex=standbyNNindex;
+ standbyNNindex=((activeNNindex+1)%2);
+ dfsActive = dfsCluster.getFileSystem(activeNNindex);
+
+ Assert.assertTrue(dfsCluster.getNameNode(activeNNindex).isActiveState());
+ Assert.assertFalse(dfsCluster.getNameNode(standbyNNindex).isActiveState());
+
+ // Restart the current standby NN (previously active)
+ dfsCluster.restartNameNode(standbyNNindex, true,
+ "-rollingUpgrade", "started");
+ Assert.assertEquals(info, dfsActive.rollingUpgrade(
+ HdfsConstants.RollingUpgradeAction.QUERY));
+ dfsCluster.waitActive();
+
+ // Do some edits and delete some edit logs
+ missingLogs.addAll(deleteEditLogsFromRandomJN());
+
+ // Check that JNSync downloaded the edit logs rolled during rolling upgrade.
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+
+ //finalize rolling upgrade
+ final RollingUpgradeInfo finalize = dfsActive.rollingUpgrade(
+ HdfsConstants.RollingUpgradeAction.FINALIZE);
+ Assert.assertTrue(finalize.isFinalized());
+
+ // Check the missing edit logs exist after finalizing rolling upgrade
+ for (File editLog : missingLogs) {
+ Assert.assertTrue("Edit log missing after finalizing rolling upgrade",
+ editLog.exists());
+ }
+ }
+
private File deleteEditLog(File currentDir, long startTxId)
throws IOException {
EditLogFile logFile = getLogFile(currentDir, startTxId);
while (logFile.isInProgress()) {
- dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+ dfsCluster.getNameNode(activeNNindex).getRpcServer().rollEditLog();
logFile = getLogFile(currentDir, startTxId);
}
File deleteFile = logFile.getFile();
@@ -389,13 +444,55 @@ public class TestJournalNodeSync {
return deleteFile;
}
+ private List<File> deleteEditLogsFromRandomJN() throws IOException {
+ Random random = new Random();
+
+ List<File> journalCurrentDirs = Lists.newArrayList();
+
+ for (int i = 0; i < 3; i++) {
+ journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
+ jid)).getCurrentDir());
+ }
+
+ long[] startTxIds = new long[20];
+ for (int i = 0; i < 20; i++) {
+ startTxIds[i] = generateEditLog();
+ }
+
+ int count = 0, startTxIdIndex;
+ long startTxId;
+ int journalIndex;
+ List<File> missingLogs = Lists.newArrayList();
+ List<Integer> deletedStartTxIds = Lists.newArrayList();
+ while (count < 5) {
+ // Select a random edit log to delete
+ startTxIdIndex = random.nextInt(20);
+ while (deletedStartTxIds.contains(startTxIdIndex)) {
+ startTxIdIndex = random.nextInt(20);
+ }
+ startTxId = startTxIds[startTxIdIndex];
+ deletedStartTxIds.add(startTxIdIndex);
+
+ // Delete the randomly selected edit log segment from randomly selected
+ // journal node
+ journalIndex = random.nextInt(3);
+ missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+ startTxId));
+
+ count++;
+ }
+
+ return missingLogs;
+ }
+
/**
* Do a mutative metadata operation on the file system.
*
* @return true if the operation was successful, false otherwise.
*/
private boolean doAnEdit() throws IOException {
- return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
+ return dfsCluster.getFileSystem(activeNNindex).mkdirs(
+ new Path("/tmp", Integer.toString(editsPerformed++)));
}
/**
@@ -414,12 +511,13 @@ public class TestJournalNodeSync {
* @return the startTxId of next segment after rolling edits.
*/
private long generateEditLog(int numEdits) throws IOException {
- long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
+ long lastWrittenTxId = dfsCluster.getNameNode(activeNNindex).getFSImage()
+ .getEditLog().getLastWrittenTxId();
for (int i = 1; i <= numEdits; i++) {
Assert.assertTrue("Failed to do an edit", doAnEdit());
}
- dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
- return startTxId;
+ dfsCluster.getNameNode(activeNNindex).getRpcServer().rollEditLog();
+ return lastWrittenTxId;
}
private Supplier<Boolean> editLogExists(List<File> editLogs) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org