You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/15 02:57:25 UTC
svn commit: r1373183 - in
/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/qjournal/client/
src/main/java/org/apache/hadoop/hdfs/qjournal/server/
src/main/java/org/apache/hadoop/hdfs/server/name...
Author: todd
Date: Wed Aug 15 00:57:24 2012
New Revision: 1373183
URL: http://svn.apache.org/viewvc?rev=1373183&view=rev
Log:
HDFS-3799. QJM: handle empty log segments during recovery. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Wed Aug 15 00:57:24 2012
@@ -20,3 +20,5 @@ HDFS-3793. Implement genericized format(
HDFS-3795. QJM: validate journal dir at startup (todd)
HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid segment (todd)
+
+HDFS-3799. QJM: handle empty log segments during recovery (todd)
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Wed Aug 15 00:57:24 2012
@@ -23,7 +23,9 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -111,15 +113,8 @@ public class IPCLoggerChannel implements
DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
executor = MoreExecutors.listeningDecorator(
- Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("Logger channel to " + addr)
- .setUncaughtExceptionHandler(
- UncaughtExceptionHandlers.systemExit())
- .build()));
+ createExecutor());
}
-
@Override
public synchronized void setEpoch(long epoch) {
this.epoch = epoch;
@@ -154,6 +149,21 @@ public class IPCLoggerChannel implements
return new QJournalProtocolTranslatorPB(pbproxy);
}
+
+ /**
+ * Separated out for easy overriding in tests.
+ */
+ @VisibleForTesting
+ protected ExecutorService createExecutor() {
+ return Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("Logger channel to " + addr)
+ .setUncaughtExceptionHandler(
+ UncaughtExceptionHandlers.systemExit())
+ .build());
+ }
+
@Override
public URL buildURLToFetchLogs(long segmentTxId) {
Preconditions.checkArgument(segmentTxId > 0,
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Wed Aug 15 00:57:24 2012
@@ -235,15 +235,30 @@ public class QuorumJournalManager implem
} else if (bestResponse.hasSegmentState()) {
LOG.info("Using longest log: " + bestEntry);
} else {
- // TODO: can we get here? what about the following case:
- // - 3 JNs, JN1, JN2, JN3
- // - writer starts segment 101 on JN1, then crashes
- // - during newEpoch(), we saw the segment on JN1 and decide to recover segment 101
- // - during prepare(), JN1 has actually crashed, and we only talk to JN2 and JN3,
+ // None of the responses to prepareRecovery() had a segment at the given
+ // txid. This can happen for example in the following situation:
+ // - 3 JNs: JN1, JN2, JN3
+ // - writer starts segment 101 on JN1, then crashes before
+ // writing to JN2 and JN3
+ // - during newEpoch(), we saw the segment on JN1 and decide to
+ // recover segment 101
+ // - before prepare(), JN1 crashes, and we only talk to JN2 and JN3,
// neither of which has any entry for this log.
- // Write a test case.
- throw new AssertionError("None of the responses " +
- "had a log to recover: " + QuorumCall.mapToString(prepareResponses));
+ // In this case, it is allowed to do nothing for recovery, since the
+ // segment wasn't started on a quorum of nodes.
+
+ // Sanity check: we should only get here if none of the responses had
+ // a log. This should be a postcondition of the recovery comparator,
+ // but a bug in the comparator might cause us to get here.
+ for (PrepareRecoveryResponseProto resp : prepareResponses.values()) {
+ assert !resp.hasSegmentState() :
+ "One of the loggers had a response, but no best logger " +
+ "was found.";
+ }
+
+ LOG.info("None of the responders had a log to recover: " +
+ QuorumCall.mapToString(prepareResponses));
+ return;
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Wed Aug 15 00:57:24 2012
@@ -101,9 +101,21 @@ class Journal implements Closeable {
}
LOG.info("Scanning storage " + fjm);
List<EditLogFile> files = fjm.getLogFiles(0);
- if (!files.isEmpty()) {
- EditLogFile latestLog = files.get(files.size() - 1);
- LOG.info("Latest log is " + latestLog);
+ if (files.isEmpty()) {
+ curSegmentTxId = HdfsConstants.INVALID_TXID;
+ return;
+ }
+
+ EditLogFile latestLog = files.get(files.size() - 1);
+ latestLog.validateLog();
+ LOG.info("Latest log is " + latestLog);
+ if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) {
+ // the log contains no transactions
+ LOG.warn("Latest log " + latestLog + " has no transactions. " +
+ "moving it aside");
+ latestLog.moveAsideEmptyFile();
+ curSegmentTxId = HdfsConstants.INVALID_TXID;
+ } else {
curSegmentTxId = latestLog.getFirstTxId();
}
}
@@ -166,6 +178,7 @@ class Journal implements Closeable {
if (curSegment != null) {
curSegment.close();
curSegment = null;
+ curSegmentTxId = HdfsConstants.INVALID_TXID;
}
NewEpochResponseProto.Builder builder =
@@ -248,10 +261,37 @@ class Journal implements Closeable {
checkRequest(reqInfo);
checkFormatted();
- Preconditions.checkState(curSegment == null,
- "Can't start a log segment, already writing " + curSegment);
- Preconditions.checkState(nextTxId == txid || nextTxId == HdfsConstants.INVALID_TXID,
- "Can't start log segment " + txid + " expecting nextTxId=" + nextTxId);
+ if (curSegment != null) {
+ LOG.warn("Client is requesting a new log segment " + txid +
+ " though we are already writing " + curSegment + ". " +
+ "Aborting the current segment in order to begin the new one.");
+ // The writer may have lost a connection to us and is now
+ // re-connecting after the connection came back.
+ // We should abort our own old segment.
+ curSegment.abort();
+ curSegment = null;
+ }
+
+ // Paranoid sanity check: we should never overwrite a finalized log file.
+ // Additionally, if it's in-progress, it should have at most 1 transaction.
+ // This can happen if the writer crashes exactly at the start of a segment.
+ EditLogFile existing = fjm.getLogFile(txid);
+ if (existing != null) {
+ if (!existing.isInProgress()) {
+ throw new IllegalStateException("Already have a finalized segment " +
+ existing + " beginning at " + txid);
+ }
+
+ // If it's in-progress, it should only contain one transaction,
+ // because the "startLogSegment" transaction is written alone at the
+ // start of each segment.
+ existing.validateLog();
+ if (existing.getLastTxId() != existing.getFirstTxId()) {
+ throw new IllegalStateException("The log file " +
+ existing + " seems to contain valid transactions");
+ }
+ }
+
curSegment = fjm.startLogSegment(txid);
curSegmentTxId = txid;
nextTxId = txid;
@@ -360,9 +400,10 @@ class Journal implements Closeable {
elf.validateLog();
}
if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {
- // no transactions in file
- throw new AssertionError("TODO: no transactions in file " +
- elf);
+ LOG.info("Edit log file " + elf + " appears to be empty. " +
+ "Moving it aside...");
+ elf.moveAsideEmptyFile();
+ return null;
}
SegmentStateProto ret = SegmentStateProto.newBuilder()
.setStartTxId(segmentTxId)
@@ -433,13 +474,16 @@ class Journal implements Closeable {
}
SegmentStateProto currentSegment = getSegmentInfo(segmentTxId);
- // TODO: this can be null, in the case that one of the loggers started
- // the next segment, but others did not! add regression test and null
- // check in next condition below.
-
- // TODO: what if they have the same length but one is finalized and the
- // other isn't! cover that case.
- if (currentSegment.getEndTxId() != segment.getEndTxId()) {
+ if (currentSegment == null ||
+ currentSegment.getEndTxId() != segment.getEndTxId()) {
+ if (currentSegment == null) {
+ LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
+ ": no current segment in place");
+ } else {
+ LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
+ ": old segment " + TextFormat.shortDebugString(segment) + " is " +
+ "not the right length");
+ }
syncLog(reqInfo, segment, fromUrl);
} else {
LOG.info("Skipping download of log " +
@@ -481,14 +525,6 @@ class Journal implements Closeable {
try {
success = tmpFile.renameTo(storage.getInProgressEditLog(
segment.getStartTxId()));
- if (success) {
- // If we're synchronizing the latest segment, update our cached
- // info.
- // TODO: can this be done more generally?
- if (curSegmentTxId == segment.getStartTxId()) {
- nextTxId = segment.getEndTxId() + 1;
- }
- }
} finally {
if (!success) {
if (!tmpFile.delete()) {
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Wed Aug 15 00:57:24 2012
@@ -460,7 +460,7 @@ public class FileJournalManager implemen
renameSelf(".corrupt");
}
- void moveAsideEmptyFile() throws IOException {
+ public void moveAsideEmptyFile() throws IOException {
assert lastTxId == HdfsConstants.INVALID_TXID;
renameSelf(".empty");
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Wed Aug 15 00:57:24 2012
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,6 +56,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* Functional tests for QuorumJournalManager.
@@ -206,6 +209,124 @@ public class TestQuorumJournalManager {
}
}
+ /**
+ * Test the case where the NN crashes after starting a new segment
+ * on all nodes, but before writing the first transaction to it.
+ */
+ @Test
+ public void testCrashAtBeginningOfSegment() throws Exception {
+ writeSegment(cluster, qjm, 1, 3, true);
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
+
+ EditLogOutputStream stm = qjm.startLogSegment(4);
+ try {
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
+ } finally {
+ stm.abort();
+ }
+
+
+ // Make a new QJM
+ qjm = new QuorumJournalManager(
+ conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
+ qjm.recoverUnfinalizedSegments();
+ checkRecovery(cluster, 1, 3);
+
+ writeSegment(cluster, qjm, 4, 3, true);
+ }
+
+ @Test
+ public void testOutOfSyncAtBeginningOfSegment0() throws Exception {
+ doTestOutOfSyncAtBeginningOfSegment(0);
+ }
+
+ @Test
+ public void testOutOfSyncAtBeginningOfSegment1() throws Exception {
+ doTestOutOfSyncAtBeginningOfSegment(1);
+ }
+
+ @Test
+ public void testOutOfSyncAtBeginningOfSegment2() throws Exception {
+ doTestOutOfSyncAtBeginningOfSegment(2);
+ }
+
+ /**
+ * Test the case where, at the beginning of a segment, transactions
+ * have been written to one JN but not others.
+ */
+ public void doTestOutOfSyncAtBeginningOfSegment(int nodeWithOneTxn)
+ throws Exception {
+
+ int nodeWithEmptySegment = (nodeWithOneTxn + 1) % 3;
+ int nodeMissingSegment = (nodeWithOneTxn + 2) % 3;
+
+ writeSegment(cluster, qjm, 1, 3, true);
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
+ cluster.getJournalNode(nodeMissingSegment).stopAndJoin(0);
+
+ // Open segment on 2/3 nodes
+ EditLogOutputStream stm = qjm.startLogSegment(4);
+ try {
+ waitForAllPendingCalls(qjm.getLoggerSetForTests());
+
+ // Write transactions to only 1/3 nodes
+ failLoggerAtTxn(spies.get(nodeWithEmptySegment), 4);
+ try {
+ writeTxns(stm, 4, 1);
+ fail("Did not fail even though 2/3 failed");
+ } catch (QuorumException qe) {
+ GenericTestUtils.assertExceptionContains("mock failure", qe);
+ }
+ } finally {
+ stm.abort();
+ }
+
+ // Bring back the down JN.
+ cluster.restartJournalNode(nodeMissingSegment);
+
+ // Make a new QJM. At this point, the state is as follows:
+ // A: nodeWithEmptySegment: 1-3 finalized, 4_inprogress (empty)
+ // B: nodeWithOneTxn: 1-3 finalized, 4_inprogress (1 txn)
+ // C: nodeMissingSegment: 1-3 finalized
+ GenericTestUtils.assertGlobEquals(
+ cluster.getCurrentDir(nodeWithEmptySegment, JID),
+ "edits_.*",
+ NNStorage.getFinalizedEditsFileName(1, 3),
+ NNStorage.getInProgressEditsFileName(4));
+ GenericTestUtils.assertGlobEquals(
+ cluster.getCurrentDir(nodeWithOneTxn, JID),
+ "edits_.*",
+ NNStorage.getFinalizedEditsFileName(1, 3),
+ NNStorage.getInProgressEditsFileName(4));
+ GenericTestUtils.assertGlobEquals(
+ cluster.getCurrentDir(nodeMissingSegment, JID),
+ "edits_.*",
+ NNStorage.getFinalizedEditsFileName(1, 3));
+
+
+ // Stop one of the nodes. Since we run this test three
+ // times, rotating the roles of the nodes, we'll test
+ // all the permutations.
+ cluster.getJournalNode(2).stopAndJoin(0);
+
+ qjm = createSpyingQJM();
+ qjm.recoverUnfinalizedSegments();
+
+ if (nodeWithOneTxn == 0 ||
+ nodeWithOneTxn == 1) {
+ // If the node that had the transaction committed was one of the nodes
+ // that responded during recovery, then we should have recovered txid
+ // 4.
+ checkRecovery(cluster, 4, 4);
+ writeSegment(cluster, qjm, 5, 3, true);
+ } else {
+ // Otherwise, we should have recovered only 1-3 and should be able to
+ // start a segment at 4.
+ checkRecovery(cluster, 1, 3);
+ writeSegment(cluster, qjm, 4, 3, true);
+ }
+ }
+
/**
* Test case where a new writer picks up from an old one with no failures
@@ -408,8 +529,15 @@ public class TestQuorumJournalManager {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
- return Mockito.spy(IPCLoggerChannel.FACTORY.createLogger(
- conf, nsInfo, journalId, addr));
+ AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
+ protected ExecutorService createExecutor() {
+ // Don't parallelize calls to the quorum in the tests.
+ // This makes the tests more deterministic.
+ return MoreExecutors.sameThreadExecutor();
+ }
+ };
+
+ return Mockito.spy(logger);
}
};
return new QuorumJournalManager(
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Wed Aug 15 00:57:24 2012
@@ -204,6 +204,89 @@ public class TestJournal {
"No log file to finalize at transaction ID 1000", ise);
}
}
+
+ /**
+ * Assume that a client is writing to a journal, but loses its connection
+ * in the middle of a segment. Thus, any future journal() calls in that
+ * segment may fail, because some txns were missed while the connection was
+ * down.
+ *
+ * Eventually, the connection comes back, and the NN tries to start a new
+ * segment at a higher txid. This should abort the old one and succeed.
+ */
+ @Test
+ public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+
+ // Start a segment at txid 1, and write a batch of 3 txns.
+ journal.startLogSegment(makeRI(1), 1);
+ journal.journal(makeRI(2), 1, 3,
+ QJMTestUtil.createTxnData(1, 3));
+
+ GenericTestUtils.assertExists(
+ journal.getStorage().getInProgressEditLog(1));
+
+ // Try to start new segment at txid 6, this should abort old segment and
+ // then succeed, allowing us to write txid 6-9.
+ journal.startLogSegment(makeRI(3), 6);
+ journal.journal(makeRI(4), 6, 3,
+ QJMTestUtil.createTxnData(6, 3));
+
+ // The old segment should *not* be finalized.
+ GenericTestUtils.assertExists(
+ journal.getStorage().getInProgressEditLog(1));
+ GenericTestUtils.assertExists(
+ journal.getStorage().getInProgressEditLog(6));
+ }
+
+ /**
+ * Test behavior of startLogSegment() when a segment with the
+ * same transaction ID already exists.
+ */
+ @Test
+ public void testStartLogSegmentWhenAlreadyExists() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+
+ // Start a segment at txid 1, and write just 1 transaction. This
+ // would normally be the START_LOG_SEGMENT transaction.
+ journal.startLogSegment(makeRI(1), 1);
+ journal.journal(makeRI(2), 1, 1,
+ QJMTestUtil.createTxnData(1, 1));
+
+ // Try to start new segment at txid 1, this should succeed, because
+ // we are allowed to re-start a segment if we only ever had the
+ // START_LOG_SEGMENT transaction logged.
+ journal.startLogSegment(makeRI(3), 1);
+ journal.journal(makeRI(4), 1, 1,
+ QJMTestUtil.createTxnData(1, 1));
+
+ // This time through, write more transactions afterwards, simulating
+ // real user transactions.
+ journal.journal(makeRI(5), 2, 3,
+ QJMTestUtil.createTxnData(2, 3));
+
+ try {
+ journal.startLogSegment(makeRI(6), 1);
+ fail("Did not fail to start log segment which would overwrite " +
+ "an existing one");
+ } catch (IllegalStateException ise) {
+ GenericTestUtils.assertExceptionContains(
+ "seems to contain valid transactions", ise);
+ }
+
+ journal.finalizeLogSegment(makeRI(7), 1, 4);
+
+ // Ensure that we cannot overwrite a finalized segment
+ try {
+ journal.startLogSegment(makeRI(8), 1);
+ fail("Did not fail to start log segment which would overwrite " +
+ "an existing one");
+ } catch (IllegalStateException ise) {
+ GenericTestUtils.assertExceptionContains(
+ "have a finalized segment", ise);
+ }
+
+ }
private static RequestInfo makeRI(int serial) {
return new RequestInfo(JID, 1, serial);
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Wed Aug 15 00:57:24 2012
@@ -118,7 +118,9 @@ public class TestJournalNode {
ch.startLogSegment(3).get();
response = ch.newEpoch(4).get();
ch.setEpoch(4);
- assertEquals(3, response.getLastSegmentTxId());
+ // Because the new segment is empty, it is equivalent to not having
+ // started writing it.
+ assertEquals(0, response.getLastSegmentTxId());
}
@Test