You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2015/09/07 17:46:37 UTC
[35/50] [abbrv] hadoop git commit: HDFS-8964. When validating the
edit log,
do not read at or beyond the file offset that is being written (Zhe Zhang via
Colin P. McCabe)
HDFS-8964. When validating the edit log, do not read at or beyond the file offset that is being written (Zhe Zhang via Colin P. McCabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/53c38cc8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/53c38cc8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/53c38cc8
Branch: refs/heads/YARN-3926
Commit: 53c38cc89ab979ec47557dcfa7affbad20578c0a
Parents: 524ba87
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Thu Sep 3 11:22:47 2015 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Thu Sep 3 11:22:47 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/namenode/FSEditLogTestUtil.java | 3 +-
.../hadoop/hdfs/qjournal/server/Journal.java | 22 ++--
.../server/namenode/EditLogFileInputStream.java | 15 ++-
.../hadoop/hdfs/server/namenode/FSEditLog.java | 10 ++
.../hdfs/server/namenode/FSEditLogLoader.java | 12 ++-
.../server/namenode/FileJournalManager.java | 39 +++++--
.../hdfs/server/namenode/SecondaryNameNode.java | 2 +-
.../TestCheckPointForSecurityTokens.java | 4 +-
.../hdfs/server/namenode/TestEditLog.java | 103 ++++++++++++++++++-
.../server/namenode/TestFSEditLogLoader.java | 13 ++-
11 files changed, 199 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 275dce2..afc6cf4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1289,6 +1289,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9009. Send metrics logs to NullAppender by default. (Arpit Agarwal)
+ HDFS-8964. When validating the edit log, do not read at or beyond the file
+ offset that is being written (Zhe Zhang via Colin P. McCabe)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
index a46f9cf..e5b9d01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
@@ -33,7 +33,8 @@ public class FSEditLogTestUtil {
public static long countTransactionsInStream(EditLogInputStream in)
throws IOException {
- FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
+ FSEditLogLoader.EditLogValidation validation =
+ FSEditLogLoader.validateEditLog(in, Long.MAX_VALUE);
return (validation.getEndTxId() - in.getFirstTxId()) + 1;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index 2953055..813f267 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -151,7 +151,7 @@ public class Journal implements Closeable {
EditLogFile latest = scanStorageForLatestEdits();
if (latest != null) {
- highestWrittenTxId = latest.getLastTxId();
+ updateHighestWrittenTxId(latest.getLastTxId());
}
}
@@ -266,7 +266,17 @@ public class Journal implements Closeable {
synchronized long getHighestWrittenTxId() {
return highestWrittenTxId;
}
-
+
+ /**
+ * Update the highest Tx ID that has been written to the journal. Also update
+ * the {@link FileJournalManager#lastReadableTxId} of the underlying fjm.
+ * @param val The new value
+ */
+ private void updateHighestWrittenTxId(long val) {
+ highestWrittenTxId = val;
+ fjm.setLastReadableTxId(val);
+ }
+
@VisibleForTesting
JournalMetrics getMetricsForTests() {
return metrics;
@@ -399,7 +409,7 @@ public class Journal implements Closeable {
metrics.bytesWritten.incr(records.length);
metrics.txnsWritten.incr(numTxns);
- highestWrittenTxId = lastTxnId;
+ updateHighestWrittenTxId(lastTxnId);
nextTxId = lastTxnId + 1;
}
@@ -782,8 +792,8 @@ public class Journal implements Closeable {
": no current segment in place");
// Update the highest txid for lag metrics
- highestWrittenTxId = Math.max(segment.getEndTxId(),
- highestWrittenTxId);
+ updateHighestWrittenTxId(Math.max(segment.getEndTxId(),
+ highestWrittenTxId));
} else {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": old segment " + TextFormat.shortDebugString(currentSegment) +
@@ -812,7 +822,7 @@ public class Journal implements Closeable {
// If we're shortening the log, update our highest txid
// used for lag metrics.
if (txnRange(currentSegment).containsLong(highestWrittenTxId)) {
- highestWrittenTxId = segment.getEndTxId();
+ updateHighestWrittenTxId(segment.getEndTxId());
}
}
syncedFile = syncLog(reqInfo, segment, fromUrl);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
index 73a162e..3bf0ab4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
@@ -300,8 +300,17 @@ public class EditLogFileInputStream extends EditLogInputStream {
return getName();
}
- static FSEditLogLoader.EditLogValidation validateEditLog(File file)
- throws IOException {
+ /**
+ * @param file File being validated.
+ * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation
+ * returns after reading this or a higher ID.
+ * The file portion beyond this ID is potentially
+ * being updated.
+ * @return Result of the validation
+ * @throws IOException
+ */
+ static FSEditLogLoader.EditLogValidation validateEditLog(File file,
+ long maxTxIdToValidate) throws IOException {
EditLogFileInputStream in;
try {
in = new EditLogFileInputStream(file);
@@ -314,7 +323,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
}
try {
- return FSEditLogLoader.validateEditLog(in);
+ return FSEditLogLoader.validateEditLog(in, maxTxIdToValidate);
} finally {
IOUtils.closeStream(in);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index edf88c9..e255cff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -674,6 +674,16 @@ public class FSEditLog implements LogsPurgeable {
synchronized (this) {
if (sync) {
synctxid = syncStart;
+ for (JournalManager jm : journalSet.getJournalManagers()) {
+ /**
+ * {@link FileJournalManager#lastReadableTxId} is only meaningful
+ * for file-based journals. Therefore the interface is not added to
+ * other types of {@link JournalManager}.
+ */
+ if (jm instanceof FileJournalManager) {
+ ((FileJournalManager)jm).setLastReadableTxId(syncStart);
+ }
+ }
isSyncRunning = false;
}
this.notifyAll();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index fc0bb78..bb36ca2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -1112,8 +1112,14 @@ public class FSEditLogLoader {
* If there are invalid or corrupt transactions in the middle of the stream,
* validateEditLog will skip over them.
* This reads through the stream but does not close it.
+ *
+ * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation
+ * returns after reading this or a higher ID.
+ * The file portion beyond this ID is potentially
+ * being updated.
*/
- static EditLogValidation validateEditLog(EditLogInputStream in) {
+ static EditLogValidation validateEditLog(EditLogInputStream in,
+ long maxTxIdToValidate) {
long lastPos = 0;
long lastTxId = HdfsServerConstants.INVALID_TXID;
long numValid = 0;
@@ -1136,6 +1142,10 @@ public class FSEditLogLoader {
|| op.getTransactionId() > lastTxId) {
lastTxId = op.getTransactionId();
}
+ if (lastTxId >= maxTxIdToValidate) {
+ break;
+ }
+
numValid++;
}
return new EditLogValidation(lastPos, lastTxId, false);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
index ebd7475..a1488eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
@@ -76,6 +76,15 @@ public class FileJournalManager implements JournalManager {
private File currentInProgress = null;
+ /**
+ * A FileJournalManager should maintain the largest Tx ID that has been
+ * safely written to its edit log files.
+ * It should limit readers to read beyond this ID to avoid potential race
+ * with ongoing writers.
+ * Initial value indicates that all transactions can be read.
+ */
+ private long lastReadableTxId = Long.MAX_VALUE;
+
@VisibleForTesting
StoragePurger purger
= new NNStorageRetentionManager.DeletionStoragePurger();
@@ -159,6 +168,15 @@ public class FileJournalManager implements JournalManager {
this.outputBufferCapacity = size;
}
+
+ public long getLastReadableTxId() {
+ return lastReadableTxId;
+ }
+
+ public void setLastReadableTxId(long id) {
+ this.lastReadableTxId = id;
+ }
+
@Override
public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {
@@ -193,7 +211,7 @@ public class FileJournalManager implements JournalManager {
}
if (elf.isInProgress()) {
try {
- elf.validateLog();
+ elf.validateLog(getLastReadableTxId());
} catch (IOException e) {
LOG.error("got IOException while trying to validate header of " +
elf + ". Skipping.", e);
@@ -325,11 +343,13 @@ public class FileJournalManager implements JournalManager {
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
"from among " + elfs.size() + " candidate file(s)");
}
- addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);
+ addStreamsToCollectionFromFiles(elfs, streams, fromTxId,
+ getLastReadableTxId(), inProgressOk);
}
static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
- Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {
+ Collection<EditLogInputStream> streams, long fromTxId, long maxTxIdToValidate,
+ boolean inProgressOk) {
for (EditLogFile elf : elfs) {
if (elf.isInProgress()) {
if (!inProgressOk) {
@@ -340,7 +360,7 @@ public class FileJournalManager implements JournalManager {
continue;
}
try {
- elf.validateLog();
+ elf.validateLog(maxTxIdToValidate);
} catch (IOException e) {
LOG.error("got IOException while trying to validate header of " +
elf + ". Skipping.", e);
@@ -384,7 +404,7 @@ public class FileJournalManager implements JournalManager {
continue;
}
- elf.validateLog();
+ elf.validateLog(getLastReadableTxId());
if (elf.hasCorruptHeader()) {
elf.moveAsideCorruptFile();
@@ -516,9 +536,14 @@ public class FileJournalManager implements JournalManager {
* Find out where the edit log ends.
* This will update the lastTxId of the EditLogFile or
* mark it as corrupt if it is.
+ * @param maxTxIdToValidate Maximum Tx ID to try to validate. Validation
+ * returns after reading this or a higher ID.
+ * The file portion beyond this ID is potentially
+ * being updated.
*/
- public void validateLog() throws IOException {
- EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
+ public void validateLog(long maxTxIdToValidate) throws IOException {
+ EditLogValidation val = EditLogFileInputStream.validateEditLog(file,
+ maxTxIdToValidate);
this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
index 2267853..e3e0a7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
@@ -907,7 +907,7 @@ public class SecondaryNameNode implements Runnable,
throw new RuntimeException(ioe);
}
FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
- fromTxId, inProgressOk);
+ fromTxId, Long.MAX_VALUE, inProgressOk);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
index 9401d07..d5e64ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
@@ -88,7 +88,7 @@ public class TestCheckPointForSecurityTokens {
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress());
- log.validateLog();
+ log.validateLog(Long.MAX_VALUE);
long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
assertEquals("In-progress log " + log + " should have 5 transactions",
5, numTransactions);;
@@ -105,7 +105,7 @@ public class TestCheckPointForSecurityTokens {
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress());
- log.validateLog();
+ log.validateLog(Long.MAX_VALUE);
long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
assertEquals("In-progress log " + log + " should only have START txn",
1, numTransactions);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index e59dec4..0495860 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSInotifyEventInputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -83,6 +85,9 @@ import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
import org.mockito.Mockito;
import org.xml.sax.ContentHandler;
@@ -1223,7 +1228,8 @@ public class TestEditLog {
TXNS_PER_ROLL*11);
for (EditLogInputStream edits : editStreams) {
- FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits);
+ FSEditLogLoader.EditLogValidation val =
+ FSEditLogLoader.validateEditLog(edits, Long.MAX_VALUE);
long read = (val.getEndTxId() - edits.getFirstTxId()) + 1;
LOG.info("Loading edits " + edits + " read " + read);
assertEquals(startTxId, edits.getFirstTxId());
@@ -1573,4 +1579,99 @@ public class TestEditLog {
}
}
}
+
+ class TestAppender extends AppenderSkeleton {
+ private final List<LoggingEvent> log = new ArrayList<>();
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ protected void append(final LoggingEvent loggingEvent) {
+ log.add(loggingEvent);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public List<LoggingEvent> getLog() {
+ return new ArrayList<>(log);
+ }
+ }
+
+ /**
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReadActivelyUpdatedLog() throws Exception {
+ final TestAppender appender = new TestAppender();
+ LogManager.getRootLogger().addAppender(appender);
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+ // Set single handler thread, so all transactions hit same thread-local ops.
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 1);
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ FSImage fsimage = cluster.getNamesystem().getFSImage();
+ StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
+
+ final DistributedFileSystem fileSys = cluster.getFileSystem();
+ DFSInotifyEventInputStream events = fileSys.getInotifyEventStream();
+ fileSys.mkdirs(new Path("/test"));
+ fileSys.mkdirs(new Path("/test/dir1"));
+ fileSys.delete(new Path("/test/dir1"), true);
+ fsimage.getEditLog().logSync();
+ fileSys.mkdirs(new Path("/test/dir2"));
+
+
+ final File inProgressEdit = NNStorage.getInProgressEditsFile(sd, 1);
+ assertTrue(inProgressEdit.exists());
+ EditLogFileInputStream elis = new EditLogFileInputStream(inProgressEdit);
+ FSEditLogOp op;
+ long pos = 0;
+
+ while (true) {
+ op = elis.readOp();
+ if (op != null && op.opCode != FSEditLogOpCodes.OP_INVALID) {
+ pos = elis.getPosition();
+ } else {
+ break;
+ }
+ }
+ elis.close();
+ assertTrue(pos > 0);
+
+ RandomAccessFile rwf = new RandomAccessFile(inProgressEdit, "rw");
+ rwf.seek(pos);
+ assertEquals(rwf.readByte(), (byte) -1);
+
+ rwf.seek(pos + 1);
+ rwf.writeByte(2);
+
+ rwf.close();
+
+ events.poll();
+ String pattern = "Caught exception after reading (.*) ops";
+ Pattern r = Pattern.compile(pattern);
+ final List<LoggingEvent> log = appender.getLog();
+ for (LoggingEvent event : log) {
+ Matcher m = r.matcher(event.getRenderedMessage());
+ if (m.find()) {
+ fail("Should not try to read past latest syned edit log op");
+ }
+ }
+
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ LogManager.getRootLogger().removeAppender(appender);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53c38cc8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index 55ba379..3c3423a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -318,7 +318,8 @@ public class TestFSEditLogLoader {
} finally {
rwf.close();
}
- EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile);
+ EditLogValidation validation =
+ EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE);
assertTrue(validation.hasCorruptHeader());
}
@@ -333,7 +334,7 @@ public class TestFSEditLogLoader {
File logFileBak = new File(testDir, logFile.getName() + ".bak");
Files.copy(logFile, logFileBak);
EditLogValidation validation =
- EditLogFileInputStream.validateEditLog(logFile);
+ EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE);
assertTrue(!validation.hasCorruptHeader());
// We expect that there will be an OP_START_LOG_SEGMENT, followed by
// NUM_TXNS opcodes, followed by an OP_END_LOG_SEGMENT.
@@ -346,7 +347,8 @@ public class TestFSEditLogLoader {
// Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset);
- validation = EditLogFileInputStream.validateEditLog(logFile);
+ validation = EditLogFileInputStream.validateEditLog(logFile,
+ Long.MAX_VALUE);
long expectedEndTxId = (txId == (NUM_TXNS + 1)) ?
NUM_TXNS : (NUM_TXNS + 1);
assertEquals("Failed when corrupting txn opcode at " + txOffset,
@@ -363,7 +365,8 @@ public class TestFSEditLogLoader {
// Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile);
truncateFile(logFile, txOffset);
- validation = EditLogFileInputStream.validateEditLog(logFile);
+ validation = EditLogFileInputStream.validateEditLog(logFile,
+ Long.MAX_VALUE);
long expectedEndTxId = (txId == 0) ?
HdfsServerConstants.INVALID_TXID : (txId - 1);
assertEquals("Failed when corrupting txid " + txId + " txn opcode " +
@@ -381,7 +384,7 @@ public class TestFSEditLogLoader {
// layout flags section.
truncateFile(logFile, 8);
EditLogValidation validation =
- EditLogFileInputStream.validateEditLog(logFile);
+ EditLogFileInputStream.validateEditLog(logFile, Long.MAX_VALUE);
assertTrue(!validation.hasCorruptHeader());
assertEquals(HdfsServerConstants.INVALID_TXID, validation.getEndTxId());
}