You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/11 17:31:43 UTC
[03/50] [abbrv] hadoop git commit: HDFS-12224. Add tests to
TestJournalNodeSync for sync after JN downtime. Contributed by Hanisha
Koneru.
HDFS-12224. Add tests to TestJournalNodeSync for sync after JN downtime. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bbc6d254
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bbc6d254
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bbc6d254
Branch: refs/heads/YARN-5881
Commit: bbc6d254c8a953abba69415d80edeede3ee6269d
Parents: fe33417
Author: Arpit Agarwal <ar...@apache.org>
Authored: Fri Aug 4 12:51:33 2017 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Fri Aug 4 12:51:33 2017 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/qjournal/server/Journal.java | 3 +-
.../hdfs/qjournal/server/JournalMetrics.java | 11 +
.../hdfs/qjournal/server/JournalNodeSyncer.java | 4 +
.../hdfs/qjournal/TestJournalNodeSync.java | 265 -----------
.../hdfs/qjournal/server/TestJournalNode.java | 6 +-
.../qjournal/server/TestJournalNodeSync.java | 439 +++++++++++++++++++
6 files changed, 458 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index 0041d5e..0f4091d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -286,8 +286,7 @@ public class Journal implements Closeable {
fjm.setLastReadableTxId(val);
}
- @VisibleForTesting
- JournalMetrics getMetricsForTests() {
+ JournalMetrics getMetrics() {
return metrics;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
index cffe2c1..fcfd901 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
@@ -45,6 +45,9 @@ class JournalMetrics {
@Metric("Number of batches written where this node was lagging")
MutableCounterLong batchesWrittenWhileLagging;
+
+ @Metric("Number of edit logs downloaded by JournalNodeSyncer")
+ private MutableCounterLong numEditLogsSynced;
private final int[] QUANTILE_INTERVALS = new int[] {
1*60, // 1m
@@ -120,4 +123,12 @@ class JournalMetrics {
q.add(us);
}
}
+
+ public MutableCounterLong getNumEditLogsSynced() {
+ return numEditLogsSynced;
+ }
+
+ public void incrNumEditLogsSynced() {
+ numEditLogsSynced.incr();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
index 479f6a0..537ba0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -77,6 +77,7 @@ public class JournalNodeSyncer {
private final long journalSyncInterval;
private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler;
+ private final JournalMetrics metrics;
JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid,
Configuration conf) {
@@ -93,6 +94,7 @@ public class JournalNodeSyncer {
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
throttler = getThrottler(conf);
+ metrics = journal.getMetrics();
}
void stopSync() {
@@ -411,6 +413,8 @@ public class JournalNodeSyncer {
LOG.warn("Deleting " + tmpEditsFile + " has failed");
}
return false;
+ } else {
+ metrics.incrNumEditLogsSynced();
}
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
deleted file mode 100644
index 8415a6f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.qjournal;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
-import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
- .getLogFile;
-
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Unit test for Journal Node formatting upon re-installation and syncing.
- */
-public class TestJournalNodeSync {
- private MiniQJMHACluster qjmhaCluster;
- private MiniDFSCluster dfsCluster;
- private MiniJournalCluster jCluster;
- private FileSystem fs;
- private FSNamesystem namesystem;
- private int editsPerformed = 0;
- private final String jid = "ns1";
-
- @Before
- public void setUpMiniCluster() throws IOException {
- final Configuration conf = new HdfsConfiguration();
- conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
- conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
- qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
- .build();
- dfsCluster = qjmhaCluster.getDfsCluster();
- jCluster = qjmhaCluster.getJournalCluster();
-
- dfsCluster.transitionToActive(0);
- fs = dfsCluster.getFileSystem(0);
- namesystem = dfsCluster.getNamesystem(0);
- }
-
- @After
- public void shutDownMiniCluster() throws IOException {
- if (qjmhaCluster != null) {
- qjmhaCluster.shutdown();
- }
- }
-
- @Test(timeout=30000)
- public void testJournalNodeSync() throws Exception {
- File firstJournalDir = jCluster.getJournalDir(0, jid);
- File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
- .getCurrentDir();
-
- // Generate some edit logs and delete one.
- long firstTxId = generateEditLog();
- generateEditLog();
-
- File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId);
-
- GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)),
- 500, 10000);
- }
-
- @Test(timeout=30000)
- public void testSyncForMultipleMissingLogs() throws Exception {
- File firstJournalDir = jCluster.getJournalDir(0, jid);
- File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
- .getCurrentDir();
-
- // Generate some edit logs and delete two.
- long firstTxId = generateEditLog();
- long nextTxId = generateEditLog();
-
- List<File> missingLogs = Lists.newArrayList();
- missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
- missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
-
- GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
- }
-
- @Test(timeout=30000)
- public void testSyncForDiscontinuousMissingLogs() throws Exception {
- File firstJournalDir = jCluster.getJournalDir(0, jid);
- File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
- .getCurrentDir();
-
- // Generate some edit logs and delete two discontinuous logs.
- long firstTxId = generateEditLog();
- generateEditLog();
- long nextTxId = generateEditLog();
-
- List<File> missingLogs = Lists.newArrayList();
- missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
- missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
-
- GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
- }
-
- @Test(timeout=30000)
- public void testMultipleJournalsMissingLogs() throws Exception {
- File firstJournalDir = jCluster.getJournalDir(0, jid);
- File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
- .getCurrentDir();
-
- File secondJournalDir = jCluster.getJournalDir(1, jid);
- StorageDirectory sd = new StorageDirectory(secondJournalDir);
- File secondJournalCurrentDir = sd.getCurrentDir();
-
- // Generate some edit logs and delete one log from two journals.
- long firstTxId = generateEditLog();
- generateEditLog();
-
- List<File> missingLogs = Lists.newArrayList();
- missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
- missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
-
- GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
- }
-
- @Test(timeout=60000)
- public void testMultipleJournalsMultipleMissingLogs() throws Exception {
- File firstJournalDir = jCluster.getJournalDir(0, jid);
- File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
- .getCurrentDir();
-
- File secondJournalDir = jCluster.getJournalDir(1, jid);
- File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
- .getCurrentDir();
-
- File thirdJournalDir = jCluster.getJournalDir(2, jid);
- File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir)
- .getCurrentDir();
-
- // Generate some edit logs and delete multiple logs in multiple journals.
- long firstTxId = generateEditLog();
- long secondTxId = generateEditLog();
- long thirdTxId = generateEditLog();
-
- List<File> missingLogs = Lists.newArrayList();
- missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
- missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
- missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId));
- missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId));
-
- GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
- }
-
- // Test JournalNode Sync by randomly deleting edit logs from one or two of
- // the journals.
- @Test(timeout=60000)
- public void testRandomJournalMissingLogs() throws Exception {
- Random randomJournal = new Random();
-
- List<File> journalCurrentDirs = Lists.newArrayList();
-
- for (int i = 0; i < 3; i++) {
- journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
- jid)).getCurrentDir());
- }
-
- int count = 0;
- long lastStartTxId;
- int journalIndex;
- List<File> missingLogs = Lists.newArrayList();
- while (count < 5) {
- lastStartTxId = generateEditLog();
-
- // Delete the last edit log segment from randomly selected journal node
- journalIndex = randomJournal.nextInt(3);
- missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
- lastStartTxId));
-
- // Delete the last edit log segment from two journals for some logs
- if (count % 2 == 0) {
- journalIndex = (journalIndex + 1) % 3;
- missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
- lastStartTxId));
- }
-
- count++;
- }
-
- GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
- }
-
- private File deleteEditLog(File currentDir, long startTxId)
- throws IOException {
- EditLogFile logFile = getLogFile(currentDir, startTxId);
- while (logFile.isInProgress()) {
- dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
- logFile = getLogFile(currentDir, startTxId);
- }
- File deleteFile = logFile.getFile();
- Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete());
-
- return deleteFile;
- }
-
- /**
- * Do a mutative metadata operation on the file system.
- *
- * @return true if the operation was successful, false otherwise.
- */
- private boolean doAnEdit() throws IOException {
- return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
- }
-
- /**
- * Does an edit and rolls the Edit Log.
- *
- * @return the startTxId of next segment after rolling edits.
- */
- private long generateEditLog() throws IOException {
- long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
- Assert.assertTrue("Failed to do an edit", doAnEdit());
- dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
- return startTxId;
- }
-
- private Supplier<Boolean> editLogExists(List<File> editLogs) {
- Supplier<Boolean> supplier = new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- for (File editLog : editLogs) {
- if (!editLog.exists()) {
- return false;
- }
- }
- return true;
- }
- };
- return supplier;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
index 9dd6846..28ec708 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
@@ -102,7 +102,7 @@ public class TestJournalNode {
@Test(timeout=100000)
public void testJournal() throws Exception {
MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
- journal.getMetricsForTests().getName());
+ journal.getMetrics().getName());
MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
@@ -117,7 +117,7 @@ public class TestJournalNode {
ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get();
metrics = MetricsAsserts.getMetrics(
- journal.getMetricsForTests().getName());
+ journal.getMetrics().getName());
MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
@@ -130,7 +130,7 @@ public class TestJournalNode {
ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
metrics = MetricsAsserts.getMetrics(
- journal.getMetricsForTests().getName());
+ journal.getMetrics().getName());
MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbc6d254/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
new file mode 100644
index 0000000..2964f05
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
+ .getLogFile;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Unit test for Journal Node formatting upon re-installation and syncing.
+ */
+public class TestJournalNodeSync {
+ private Configuration conf;
+ private MiniQJMHACluster qjmhaCluster;
+ private MiniDFSCluster dfsCluster;
+ private MiniJournalCluster jCluster;
+ private FileSystem fs;
+ private FSNamesystem namesystem;
+ private int editsPerformed = 0;
+ private final String jid = "ns1";
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Before
+ public void setUpMiniCluster() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
+ conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
+ if (testName.getMethodName().equals(
+ "testSyncAfterJNdowntimeWithoutQJournalQueue")) {
+ conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0);
+ }
+ qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
+ .build();
+ dfsCluster = qjmhaCluster.getDfsCluster();
+ jCluster = qjmhaCluster.getJournalCluster();
+
+ dfsCluster.transitionToActive(0);
+ fs = dfsCluster.getFileSystem(0);
+ namesystem = dfsCluster.getNamesystem(0);
+ }
+
+ @After
+ public void shutDownMiniCluster() throws IOException {
+ if (qjmhaCluster != null) {
+ qjmhaCluster.shutdown();
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testJournalNodeSync() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ // Generate some edit logs and delete one.
+ long firstTxId = generateEditLog();
+ generateEditLog();
+
+ File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId);
+
+ GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)),
+ 500, 10000);
+ }
+
+ @Test(timeout=30000)
+ public void testSyncForMultipleMissingLogs() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ // Generate some edit logs and delete two.
+ long firstTxId = generateEditLog();
+ long nextTxId = generateEditLog();
+
+ List<File> missingLogs = Lists.newArrayList();
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+ }
+
+ @Test(timeout=30000)
+ public void testSyncForDiscontinuousMissingLogs() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ // Generate some edit logs and delete two discontinuous logs.
+ long firstTxId = generateEditLog();
+ generateEditLog();
+ long nextTxId = generateEditLog();
+
+ List<File> missingLogs = Lists.newArrayList();
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+ }
+
+ @Test(timeout=30000)
+ public void testMultipleJournalsMissingLogs() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ File secondJournalDir = jCluster.getJournalDir(1, jid);
+ StorageDirectory sd = new StorageDirectory(secondJournalDir);
+ File secondJournalCurrentDir = sd.getCurrentDir();
+
+ // Generate some edit logs and delete one log from two journals.
+ long firstTxId = generateEditLog();
+ generateEditLog();
+
+ List<File> missingLogs = Lists.newArrayList();
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+ }
+
+ @Test(timeout=60000)
+ public void testMultipleJournalsMultipleMissingLogs() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ File secondJournalDir = jCluster.getJournalDir(1, jid);
+ File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+ .getCurrentDir();
+
+ File thirdJournalDir = jCluster.getJournalDir(2, jid);
+ File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir)
+ .getCurrentDir();
+
+ // Generate some edit logs and delete multiple logs in multiple journals.
+ long firstTxId = generateEditLog();
+ long secondTxId = generateEditLog();
+ long thirdTxId = generateEditLog();
+
+ List<File> missingLogs = Lists.newArrayList();
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId));
+ missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId));
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+ }
+
+ // Test JournalNode Sync by randomly deleting edit logs from one or two of
+ // the journals.
+ @Test(timeout=60000)
+ public void testRandomJournalMissingLogs() throws Exception {
+ Random randomJournal = new Random();
+
+ List<File> journalCurrentDirs = Lists.newArrayList();
+
+ for (int i = 0; i < 3; i++) {
+ journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
+ jid)).getCurrentDir());
+ }
+
+ int count = 0;
+ long lastStartTxId;
+ int journalIndex;
+ List<File> missingLogs = Lists.newArrayList();
+ while (count < 5) {
+ lastStartTxId = generateEditLog();
+
+ // Delete the last edit log segment from randomly selected journal node
+ journalIndex = randomJournal.nextInt(3);
+ missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+ lastStartTxId));
+
+ // Delete the last edit log segment from two journals for some logs
+ if (count % 2 == 0) {
+ journalIndex = (journalIndex + 1) % 3;
+ missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+ lastStartTxId));
+ }
+
+ count++;
+ }
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+ }
+
+ // Test JournalNode Sync when a JN id down while NN is actively writing
+ // logs and comes back up after some time.
+ @Test (timeout=300_000)
+ public void testSyncAfterJNdowntime() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+ File secondJournalDir = jCluster.getJournalDir(1, jid);
+ File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+ .getCurrentDir();
+
+ long[] startTxIds = new long[10];
+
+ startTxIds[0] = generateEditLog();
+ startTxIds[1] = generateEditLog();
+
+ // Stop the first JN
+ jCluster.getJournalNode(0).stop(0);
+
+ // Roll some more edits while the first JN is down
+ for (int i = 2; i < 10; i++) {
+ startTxIds[i] = generateEditLog(5);
+ }
+
+ // Re-start the first JN
+ jCluster.restartJournalNode(0);
+
+ // Roll an edit to update the committed tx id of the first JN
+ generateEditLog();
+
+ // List the edit logs rolled during JN down time.
+ List<File> missingLogs = Lists.newArrayList();
+ for (int i = 2; i < 10; i++) {
+ EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i],
+ false);
+ missingLogs.add(new File(firstJournalCurrentDir,
+ logFile.getFile().getName()));
+ }
+
+ // Check that JNSync downloaded the edit logs rolled during JN down time.
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+ }
+
+ /**
+ * Test JournalNode Sync when a JN id down while NN is actively writing
+ * logs and comes back up after some time with no edit log queueing.
+ * Queuing disabled during the cluster setup {@link #setUpMiniCluster()}
+ * @throws Exception
+ */
+ @Test (timeout=300_000)
+ public void testSyncAfterJNdowntimeWithoutQJournalQueue() throws Exception{
+ // Queuing is disabled during the cluster setup {@link #setUpMiniCluster()}
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+ File secondJournalDir = jCluster.getJournalDir(1, jid);
+ File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+ .getCurrentDir();
+
+ long[] startTxIds = new long[10];
+
+ startTxIds[0] = generateEditLog();
+ startTxIds[1] = generateEditLog(2);
+
+ // Stop the first JN
+ jCluster.getJournalNode(0).stop(0);
+
+ // Roll some more edits while the first JN is down
+ for (int i = 2; i < 10; i++) {
+ startTxIds[i] = generateEditLog(5);
+ }
+
+ // Re-start the first JN
+ jCluster.restartJournalNode(0);
+
+ // After JN restart and before rolling another edit, the missing edit
+ // logs will not by synced as the committed tx id of the JN will be
+ // less than the start tx id's of the missing edit logs and edit log queuing
+ // has been disabled.
+ // Roll an edit to update the committed tx id of the first JN
+ generateEditLog(2);
+
+ // List the edit logs rolled during JN down time.
+ List<File> missingLogs = Lists.newArrayList();
+ for (int i = 2; i < 10; i++) {
+ EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i],
+ false);
+ missingLogs.add(new File(firstJournalCurrentDir,
+ logFile.getFile().getName()));
+ }
+
+ // Check that JNSync downloaded the edit logs rolled during JN down time.
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+
+ // Check that all the missing edit logs have been downloaded via
+ // JournalNodeSyncer alone (as the edit log queueing has been disabled)
+ long numEditLogsSynced = jCluster.getJournalNode(0).getOrCreateJournal(jid)
+ .getMetrics().getNumEditLogsSynced().value();
+ Assert.assertTrue("Edit logs downloaded outside syncer. Expected 8 or " +
+ "more downloads, got " + numEditLogsSynced + " downloads instead",
+ numEditLogsSynced >= 8);
+ }
+
+ // Test JournalNode Sync when a JN is formatted while NN is actively writing
+ // logs.
+ @Test (timeout=300_000)
+ public void testSyncAfterJNformat() throws Exception{
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+ File secondJournalDir = jCluster.getJournalDir(1, jid);
+ File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+ .getCurrentDir();
+
+ long[] startTxIds = new long[10];
+
+ startTxIds[0] = generateEditLog(1);
+ startTxIds[1] = generateEditLog(2);
+ startTxIds[2] = generateEditLog(4);
+ startTxIds[3] = generateEditLog(6);
+
+ Journal journal1 = jCluster.getJournalNode(0).getOrCreateJournal(jid);
+ NamespaceInfo nsInfo = journal1.getStorage().getNamespaceInfo();
+
+ // Delete contents of current directory of one JN
+ for (File file : firstJournalCurrentDir.listFiles()) {
+ file.delete();
+ }
+
+ // Format the JN
+ journal1.format(nsInfo);
+
+ // Roll some more edits
+ for (int i = 4; i < 10; i++) {
+ startTxIds[i] = generateEditLog(5);
+ }
+
+ // List the edit logs rolled during JN down time.
+ List<File> missingLogs = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ EditLogFile logFile = getLogFile(secondJournalCurrentDir, startTxIds[i],
+ false);
+ missingLogs.add(new File(firstJournalCurrentDir,
+ logFile.getFile().getName()));
+ }
+
+ // Check that the formatted JN has all the edit logs.
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+ }
+
+ private File deleteEditLog(File currentDir, long startTxId)
+ throws IOException {
+ EditLogFile logFile = getLogFile(currentDir, startTxId);
+ while (logFile.isInProgress()) {
+ dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+ logFile = getLogFile(currentDir, startTxId);
+ }
+ File deleteFile = logFile.getFile();
+ Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete());
+
+ return deleteFile;
+ }
+
+ /**
+ * Do a mutative metadata operation on the file system.
+ *
+ * @return true if the operation was successful, false otherwise.
+ */
+ private boolean doAnEdit() throws IOException {
+ return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
+ }
+
+ /**
+ * Does an edit and rolls the Edit Log.
+ *
+ * @return the startTxId of next segment after rolling edits.
+ */
+ private long generateEditLog() throws IOException {
+ return generateEditLog(1);
+ }
+
+ /**
+ * Does specified number of edits and rolls the Edit Log.
+ *
+ * @param numEdits number of Edits to perform
+ * @return the startTxId of next segment after rolling edits.
+ */
+ private long generateEditLog(int numEdits) throws IOException {
+ long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
+ for (int i = 1; i <= numEdits; i++) {
+ Assert.assertTrue("Failed to do an edit", doAnEdit());
+ }
+ dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+ return startTxId;
+ }
+
+ private Supplier<Boolean> editLogExists(List<File> editLogs) {
+ Supplier<Boolean> supplier = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ for (File editLog : editLogs) {
+ if (!editLog.exists()) {
+ return false;
+ }
+ }
+ return true;
+ }
+ };
+ return supplier;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org