You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2017/08/04 19:52:02 UTC

hadoop git commit: HDFS-12224. Add tests to TestJournalNodeSync for sync after JN downtime. Contributed by Hanisha Koneru.

Repository: hadoop
Updated Branches:
  refs/heads/trunk fe3341786 -> bbc6d254c


HDFS-12224. Add tests to TestJournalNodeSync for sync after JN downtime. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bbc6d254
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bbc6d254
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bbc6d254

Branch: refs/heads/trunk
Commit: bbc6d254c8a953abba69415d80edeede3ee6269d
Parents: fe33417
Author: Arpit Agarwal <ar...@apache.org>
Authored: Fri Aug 4 12:51:33 2017 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Fri Aug 4 12:51:33 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/qjournal/server/Journal.java    |   3 +-
 .../hdfs/qjournal/server/JournalMetrics.java    |  11 +
 .../hdfs/qjournal/server/JournalNodeSyncer.java |   4 +
 .../hdfs/qjournal/TestJournalNodeSync.java      | 265 -----------
 .../hdfs/qjournal/server/TestJournalNode.java   |   6 +-
 .../qjournal/server/TestJournalNodeSync.java    | 439 +++++++++++++++++++
 6 files changed, 458 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index 0041d5e..0f4091d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -286,8 +286,7 @@ public class Journal implements Closeable {
     fjm.setLastReadableTxId(val);
   }
 
-  @VisibleForTesting
-  JournalMetrics getMetricsForTests() {
+  JournalMetrics getMetrics() {
     return metrics;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
index cffe2c1..fcfd901 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
@@ -45,6 +45,9 @@ class JournalMetrics {
   
   @Metric("Number of batches written where this node was lagging")
   MutableCounterLong batchesWrittenWhileLagging;
+
+  @Metric("Number of edit logs downloaded by JournalNodeSyncer")
+  private MutableCounterLong numEditLogsSynced;
   
   private final int[] QUANTILE_INTERVALS = new int[] {
       1*60, // 1m
@@ -120,4 +123,12 @@ class JournalMetrics {
       q.add(us);
     }
   }
+
+  public MutableCounterLong getNumEditLogsSynced() {
+    return numEditLogsSynced;
+  }
+
+  public void incrNumEditLogsSynced() {
+    numEditLogsSynced.incr();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
index 479f6a0..537ba0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -77,6 +77,7 @@ public class JournalNodeSyncer {
   private final long journalSyncInterval;
   private final int logSegmentTransferTimeout;
   private final DataTransferThrottler throttler;
+  private final JournalMetrics metrics;
 
   JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid,
       Configuration conf) {
@@ -93,6 +94,7 @@ public class JournalNodeSyncer {
         DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
         DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
     throttler = getThrottler(conf);
+    metrics = journal.getMetrics();
   }
 
   void stopSync() {
@@ -411,6 +413,8 @@ public class JournalNodeSyncer {
         LOG.warn("Deleting " + tmpEditsFile + " has failed");
       }
       return false;
+    } else {
+      metrics.incrNumEditLogsSynced();
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
deleted file mode 100644
index 8415a6f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.qjournal;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
-import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
-    .getLogFile;
-
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Unit test for Journal Node formatting upon re-installation and syncing.
- */
-public class TestJournalNodeSync {
-  private MiniQJMHACluster qjmhaCluster;
-  private MiniDFSCluster dfsCluster;
-  private MiniJournalCluster jCluster;
-  private FileSystem fs;
-  private FSNamesystem namesystem;
-  private int editsPerformed = 0;
-  private final String jid = "ns1";
-
-  @Before
-  public void setUpMiniCluster() throws IOException {
-    final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
-    conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
-    qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
-      .build();
-    dfsCluster = qjmhaCluster.getDfsCluster();
-    jCluster = qjmhaCluster.getJournalCluster();
-
-    dfsCluster.transitionToActive(0);
-    fs = dfsCluster.getFileSystem(0);
-    namesystem = dfsCluster.getNamesystem(0);
-  }
-
-  @After
-  public void shutDownMiniCluster() throws IOException {
-    if (qjmhaCluster != null) {
-      qjmhaCluster.shutdown();
-    }
-  }
-
-  @Test(timeout=30000)
-  public void testJournalNodeSync() throws Exception {
-    File firstJournalDir = jCluster.getJournalDir(0, jid);
-    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
-        .getCurrentDir();
-
-    // Generate some edit logs and delete one.
-    long firstTxId = generateEditLog();
-    generateEditLog();
-
-    File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId);
-
-    GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)),
-        500, 10000);
-  }
-
-  @Test(timeout=30000)
-  public void testSyncForMultipleMissingLogs() throws Exception {
-    File firstJournalDir = jCluster.getJournalDir(0, jid);
-    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
-        .getCurrentDir();
-
-    // Generate some edit logs and delete two.
-    long firstTxId = generateEditLog();
-    long nextTxId = generateEditLog();
-
-    List<File> missingLogs = Lists.newArrayList();
-    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
-    missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
-
-    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
-  }
-
-  @Test(timeout=30000)
-  public void testSyncForDiscontinuousMissingLogs() throws Exception {
-    File firstJournalDir = jCluster.getJournalDir(0, jid);
-    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
-        .getCurrentDir();
-
-    // Generate some edit logs and delete two discontinuous logs.
-    long firstTxId = generateEditLog();
-    generateEditLog();
-    long nextTxId = generateEditLog();
-
-    List<File> missingLogs = Lists.newArrayList();
-    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
-    missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
-
-    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
-  }
-
-  @Test(timeout=30000)
-  public void testMultipleJournalsMissingLogs() throws Exception {
-    File firstJournalDir = jCluster.getJournalDir(0, jid);
-    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
-        .getCurrentDir();
-
-    File secondJournalDir = jCluster.getJournalDir(1, jid);
-    StorageDirectory sd = new StorageDirectory(secondJournalDir);
-    File secondJournalCurrentDir = sd.getCurrentDir();
-
-    // Generate some edit logs and delete one log from two journals.
-    long firstTxId = generateEditLog();
-    generateEditLog();
-
-    List<File> missingLogs = Lists.newArrayList();
-    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
-    missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
-
-    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
-  }
-
-  @Test(timeout=60000)
-  public void testMultipleJournalsMultipleMissingLogs() throws Exception {
-    File firstJournalDir = jCluster.getJournalDir(0, jid);
-    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
-        .getCurrentDir();
-
-    File secondJournalDir = jCluster.getJournalDir(1, jid);
-    File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
-        .getCurrentDir();
-
-    File thirdJournalDir = jCluster.getJournalDir(2, jid);
-    File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir)
-        .getCurrentDir();
-
-    // Generate some edit logs and delete multiple logs in multiple journals.
-    long firstTxId = generateEditLog();
-    long secondTxId = generateEditLog();
-    long thirdTxId = generateEditLog();
-
-    List<File> missingLogs = Lists.newArrayList();
-    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
-    missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
-    missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId));
-    missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId));
-
-    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
-  }
-
-  // Test JournalNode Sync by randomly deleting edit logs from one or two of
-  // the journals.
-  @Test(timeout=60000)
-  public void testRandomJournalMissingLogs() throws Exception {
-    Random randomJournal = new Random();
-
-    List<File> journalCurrentDirs = Lists.newArrayList();
-
-    for (int i = 0; i < 3; i++) {
-      journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
-          jid)).getCurrentDir());
-    }
-
-    int count = 0;
-    long lastStartTxId;
-    int journalIndex;
-    List<File> missingLogs = Lists.newArrayList();
-    while (count < 5) {
-      lastStartTxId = generateEditLog();
-
-      // Delete the last edit log segment from randomly selected journal node
-      journalIndex = randomJournal.nextInt(3);
-      missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
-          lastStartTxId));
-
-      // Delete the last edit log segment from two journals for some logs
-      if (count % 2 == 0) {
-        journalIndex = (journalIndex + 1) % 3;
-        missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
-            lastStartTxId));
-      }
-
-      count++;
-    }
-
-    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
-  }
-
-  private File deleteEditLog(File currentDir, long startTxId)
-      throws IOException {
-    EditLogFile logFile = getLogFile(currentDir, startTxId);
-    while (logFile.isInProgress()) {
-      dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
-      logFile = getLogFile(currentDir, startTxId);
-    }
-    File deleteFile = logFile.getFile();
-    Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete());
-
-    return deleteFile;
-  }
-
-  /**
-   * Do a mutative metadata operation on the file system.
-   *
-   * @return true if the operation was successful, false otherwise.
-   */
-  private boolean doAnEdit() throws IOException {
-    return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
-  }
-
-  /**
-   * Does an edit and rolls the Edit Log.
-   *
-   * @return the startTxId of next segment after rolling edits.
-   */
-  private long generateEditLog() throws IOException {
-    long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
-    Assert.assertTrue("Failed to do an edit", doAnEdit());
-    dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
-    return startTxId;
-  }
-
-  private Supplier<Boolean> editLogExists(List<File> editLogs) {
-    Supplier<Boolean> supplier = new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        for (File editLog : editLogs) {
-          if (!editLog.exists()) {
-            return false;
-          }
-        }
-        return true;
-      }
-    };
-    return supplier;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
index 9dd6846..28ec708 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
@@ -102,7 +102,7 @@ public class TestJournalNode {
   @Test(timeout=100000)
   public void testJournal() throws Exception {
     MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
-        journal.getMetricsForTests().getName());
+        journal.getMetrics().getName());
     MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
     MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
     MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
@@ -117,7 +117,7 @@ public class TestJournalNode {
     ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get();
     
     metrics = MetricsAsserts.getMetrics(
-        journal.getMetricsForTests().getName());
+        journal.getMetrics().getName());
     MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
     MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
     MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
@@ -130,7 +130,7 @@ public class TestJournalNode {
     ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
 
     metrics = MetricsAsserts.getMetrics(
-        journal.getMetricsForTests().getName());
+        journal.getMetrics().getName());
     MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
     MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
     MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
new file mode 100644
index 0000000..2964f05
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
+    .getLogFile;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Unit test for Journal Node formatting upon re-installation and syncing.
+ */
+public class TestJournalNodeSync {
+  private Configuration conf;
+  private MiniQJMHACluster qjmhaCluster;
+  private MiniDFSCluster dfsCluster;
+  private MiniJournalCluster jCluster;
+  private FileSystem fs;
+  private FSNamesystem namesystem;
+  private int editsPerformed = 0;
+  private final String jid = "ns1";
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Before
+  public void setUpMiniCluster() throws IOException {
+    conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
+    conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
+    if (testName.getMethodName().equals(
+        "testSyncAfterJNdowntimeWithoutQJournalQueue")) {
+      conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0);
+    }
+    qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
+      .build();
+    dfsCluster = qjmhaCluster.getDfsCluster();
+    jCluster = qjmhaCluster.getJournalCluster();
+
+    dfsCluster.transitionToActive(0);
+    fs = dfsCluster.getFileSystem(0);
+    namesystem = dfsCluster.getNamesystem(0);
+  }
+
+  @After
+  public void shutDownMiniCluster() throws IOException {
+    if (qjmhaCluster != null) {
+      qjmhaCluster.shutdown();
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testJournalNodeSync() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    // Generate some edit logs and delete one.
+    long firstTxId = generateEditLog();
+    generateEditLog();
+
+    File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId);
+
+    GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)),
+        500, 10000);
+  }
+
+  @Test(timeout=30000)
+  public void testSyncForMultipleMissingLogs() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    // Generate some edit logs and delete two.
+    long firstTxId = generateEditLog();
+    long nextTxId = generateEditLog();
+
+    List<File> missingLogs = Lists.newArrayList();
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+  }
+
+  @Test(timeout=30000)
+  public void testSyncForDiscontinuousMissingLogs() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    // Generate some edit logs and delete two discontinuous logs.
+    long firstTxId = generateEditLog();
+    generateEditLog();
+    long nextTxId = generateEditLog();
+
+    List<File> missingLogs = Lists.newArrayList();
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+  }
+
+  @Test(timeout=30000)
+  public void testMultipleJournalsMissingLogs() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    File secondJournalDir = jCluster.getJournalDir(1, jid);
+    StorageDirectory sd = new StorageDirectory(secondJournalDir);
+    File secondJournalCurrentDir = sd.getCurrentDir();
+
+    // Generate some edit logs and delete one log from two journals.
+    long firstTxId = generateEditLog();
+    generateEditLog();
+
+    List<File> missingLogs = Lists.newArrayList();
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+  }
+
+  @Test(timeout=60000)
+  public void testMultipleJournalsMultipleMissingLogs() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    File secondJournalDir = jCluster.getJournalDir(1, jid);
+    File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+        .getCurrentDir();
+
+    File thirdJournalDir = jCluster.getJournalDir(2, jid);
+    File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir)
+        .getCurrentDir();
+
+    // Generate some edit logs and delete multiple logs in multiple journals.
+    long firstTxId = generateEditLog();
+    long secondTxId = generateEditLog();
+    long thirdTxId = generateEditLog();
+
+    List<File> missingLogs = Lists.newArrayList();
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId));
+    missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId));
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+  }
+
+  // Test JournalNode Sync by randomly deleting edit logs from one or two of
+  // the journals.
+  @Test(timeout=60000)
+  public void testRandomJournalMissingLogs() throws Exception {
+    Random randomJournal = new Random();
+
+    List<File> journalCurrentDirs = Lists.newArrayList();
+
+    for (int i = 0; i < 3; i++) {
+      journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
+          jid)).getCurrentDir());
+    }
+
+    int count = 0;
+    long lastStartTxId;
+    int journalIndex;
+    List<File> missingLogs = Lists.newArrayList();
+    while (count < 5) {
+      lastStartTxId = generateEditLog();
+
+      // Delete the last edit log segment from randomly selected journal node
+      journalIndex = randomJournal.nextInt(3);
+      missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+          lastStartTxId));
+
+      // Delete the last edit log segment from two journals for some logs
+      if (count % 2 == 0) {
+        journalIndex = (journalIndex + 1) % 3;
+        missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+            lastStartTxId));
+      }
+
+      count++;
+    }
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+  }
+
+  // Test JournalNode Sync when a JN id down while NN is actively writing
+  // logs and comes back up after some time.
+  @Test (timeout=300_000)
+  public void testSyncAfterJNdowntime() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+    File secondJournalDir = jCluster.getJournalDir(1, jid);
+    File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+        .getCurrentDir();
+
+    long[] startTxIds = new long[10];
+
+    startTxIds[0] = generateEditLog();
+    startTxIds[1] = generateEditLog();
+
+    // Stop the first JN
+    jCluster.getJournalNode(0).stop(0);
+
+    // Roll some more edits while the first JN is down
+    for (int i = 2; i < 10; i++) {
+      startTxIds[i] = generateEditLog(5);
+    }
+
+    // Re-start the first JN
+    jCluster.restartJournalNode(0);
+
+    // Roll an edit to update the committed tx id of the first JN
+    generateEditLog();
+
+    // List the edit logs rolled during JN down time.
+    List<File> missingLogs = Lists.newArrayList();
+    for (int i = 2; i < 10; i++) {
+      EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i],
+          false);
+      missingLogs.add(new File(firstJournalCurrentDir,
+          logFile.getFile().getName()));
+    }
+
+    // Check that JNSync downloaded the edit logs rolled during JN down time.
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+  }
+
+  /**
+   * Test JournalNode Sync when a JN id down while NN is actively writing
+   * logs and comes back up after some time with no edit log queueing.
+   * Queuing disabled during the cluster setup {@link #setUpMiniCluster()}
+   * @throws Exception
+   */
+  @Test (timeout=300_000)
+  public void testSyncAfterJNdowntimeWithoutQJournalQueue() throws Exception{
+    // Queuing is disabled during the cluster setup {@link #setUpMiniCluster()}
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+    File secondJournalDir = jCluster.getJournalDir(1, jid);
+    File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+        .getCurrentDir();
+
+    long[] startTxIds = new long[10];
+
+    startTxIds[0] = generateEditLog();
+    startTxIds[1] = generateEditLog(2);
+
+    // Stop the first JN
+    jCluster.getJournalNode(0).stop(0);
+
+    // Roll some more edits while the first JN is down
+    for (int i = 2; i < 10; i++) {
+      startTxIds[i] = generateEditLog(5);
+    }
+
+    // Re-start the first JN
+    jCluster.restartJournalNode(0);
+
+    // After JN restart and before rolling another edit, the missing edit
+    // logs will not by synced as the committed tx id of the JN will be
+    // less than the start tx id's of the missing edit logs and edit log queuing
+    // has been disabled.
+    // Roll an edit to update the committed tx id of the first JN
+    generateEditLog(2);
+
+    // List the edit logs rolled during JN down time.
+    List<File> missingLogs = Lists.newArrayList();
+    for (int i = 2; i < 10; i++) {
+      EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i],
+          false);
+      missingLogs.add(new File(firstJournalCurrentDir,
+          logFile.getFile().getName()));
+    }
+
+    // Check that JNSync downloaded the edit logs rolled during JN down time.
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+
+    // Check that all the missing edit logs have been downloaded via
+    // JournalNodeSyncer alone (as the edit log queueing has been disabled)
+    long numEditLogsSynced = jCluster.getJournalNode(0).getOrCreateJournal(jid)
+        .getMetrics().getNumEditLogsSynced().value();
+    Assert.assertTrue("Edit logs downloaded outside syncer. Expected 8 or " +
+            "more downloads, got " + numEditLogsSynced + " downloads instead",
+        numEditLogsSynced >= 8);
+  }
+
+  // Test JournalNode Sync when a JN is formatted while NN is actively writing
+  // logs.
+  @Test (timeout=300_000)
+  public void testSyncAfterJNformat() throws Exception{
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+    File secondJournalDir = jCluster.getJournalDir(1, jid);
+    File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+        .getCurrentDir();
+
+    long[] startTxIds = new long[10];
+
+    startTxIds[0] = generateEditLog(1);
+    startTxIds[1] = generateEditLog(2);
+    startTxIds[2] = generateEditLog(4);
+    startTxIds[3] = generateEditLog(6);
+
+    Journal journal1 = jCluster.getJournalNode(0).getOrCreateJournal(jid);
+    NamespaceInfo nsInfo = journal1.getStorage().getNamespaceInfo();
+
+    // Delete contents of current directory of one JN
+    for (File file : firstJournalCurrentDir.listFiles()) {
+      file.delete();
+    }
+
+    // Format the JN
+    journal1.format(nsInfo);
+
+    // Roll some more edits
+    for (int i = 4; i < 10; i++) {
+      startTxIds[i] = generateEditLog(5);
+    }
+
+    // List the edit logs rolled during JN down time.
+    List<File> missingLogs = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i],
+          false);
+      missingLogs.add(new File(firstJournalCurrentDir,
+          logFile.getFile().getName()));
+    }
+
+    // Check that the formatted JN has all the edit logs.
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+  }
+
+  private File deleteEditLog(File currentDir, long startTxId)
+      throws IOException {
+    EditLogFile logFile = getLogFile(currentDir, startTxId);
+    while (logFile.isInProgress()) {
+      dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+      logFile = getLogFile(currentDir, startTxId);
+    }
+    File deleteFile = logFile.getFile();
+    Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete());
+
+    return deleteFile;
+  }
+
+  /**
+   * Do a mutative metadata operation on the file system.
+   *
+   * @return true if the operation was successful, false otherwise.
+   */
+  private boolean doAnEdit() throws IOException {
+    return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
+  }
+
+  /**
+   * Does an edit and rolls the Edit Log.
+   *
+   * @return the startTxId of next segment after rolling edits.
+   */
+  private long generateEditLog() throws IOException {
+    return generateEditLog(1);
+  }
+
+  /**
+   * Does specified number of edits and rolls the Edit Log.
+   *
+   * @param numEdits number of Edits to perform
+   * @return the startTxId of next segment after rolling edits.
+   */
+  private long generateEditLog(int numEdits) throws IOException {
+    long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
+    for (int i = 1; i <= numEdits; i++) {
+      Assert.assertTrue("Failed to do an edit", doAnEdit());
+    }
+    dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+    return startTxId;
+  }
+
+  private Supplier<Boolean> editLogExists(List<File> editLogs) {
+    Supplier<Boolean> supplier = new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        for (File editLog : editLogs) {
+          if (!editLog.exists()) {
+            return false;
+          }
+        }
+        return true;
+      }
+    };
+    return supplier;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org