You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/12/05 20:22:25 UTC

svn commit: r1417596 [6/6] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java...

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
+import org.apache.hadoop.hdfs.qjournal.server.Journal;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestJournal {
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L);
+  private static final NamespaceInfo FAKE_NSINFO_2 = new NamespaceInfo(
+      6789, "mycluster", "my-bp", 0L);
+  
+  private static final String JID = "test-journal";
+
+  private static final File TEST_LOG_DIR = new File(
+      new File(MiniDFSCluster.getBaseDirectory()), "TestJournal");
+
+  private StorageErrorReporter mockErrorReporter = Mockito.mock(
+      StorageErrorReporter.class);
+
+  private Journal journal;
+
+  
+  @Before
+  public void setup() throws Exception {
+    FileUtil.fullyDelete(TEST_LOG_DIR);
+    journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+    journal.format(FAKE_NSINFO);
+  }
+  
+  @After
+  public void verifyNoStorageErrors() throws Exception{
+    Mockito.verify(mockErrorReporter, Mockito.never())
+      .reportErrorOnFile(Mockito.<File>any());
+  }
+  
+  @After
+  public void cleanup() {
+    IOUtils.closeStream(journal);
+  }
+  
+  @Test
+  public void testEpochHandling() throws Exception {
+    assertEquals(0, journal.getLastPromisedEpoch());
+    NewEpochResponseProto newEpoch =
+        journal.newEpoch(FAKE_NSINFO, 1);
+    assertFalse(newEpoch.hasLastSegmentTxId());
+    assertEquals(1, journal.getLastPromisedEpoch());
+    journal.newEpoch(FAKE_NSINFO, 3);
+    assertFalse(newEpoch.hasLastSegmentTxId());
+    assertEquals(3, journal.getLastPromisedEpoch());
+    try {
+      journal.newEpoch(FAKE_NSINFO, 3);
+      fail("Should have failed to promise same epoch twice");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Proposed epoch 3 <= last promise 3", ioe);
+    }
+    try {
+      journal.startLogSegment(makeRI(1), 12345L);
+      fail("Should have rejected call from prior epoch");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "epoch 1 is less than the last promised epoch 3", ioe);
+    }
+    try {
+      journal.journal(makeRI(1), 12345L, 100L, 0, new byte[0]);
+      fail("Should have rejected call from prior epoch");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "epoch 1 is less than the last promised epoch 3", ioe);
+    }
+  }
+  
+  @Test
+  public void testMaintainCommittedTxId() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    journal.startLogSegment(makeRI(1), 1);
+    // Send txids 1-3, with a request indicating only 0 committed
+    journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3,
+        QJMTestUtil.createTxnData(1, 3));
+    assertEquals(0, journal.getCommittedTxnIdForTests());
+    
+    // Send 4-6, with request indicating that through 3 is committed.
+    journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3,
+        QJMTestUtil.createTxnData(4, 6));
+    assertEquals(3, journal.getCommittedTxnIdForTests());    
+  }
+  
+  @Test
+  public void testRestartJournal() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    journal.startLogSegment(makeRI(1), 1);
+    journal.journal(makeRI(2), 1, 1, 2, 
+        QJMTestUtil.createTxnData(1, 2));
+    // Don't finalize.
+    
+    String storageString = journal.getStorage().toColonSeparatedString();
+    System.err.println("storage string: " + storageString);
+    journal.close(); // close to unlock the storage dir
+    
+    // Now re-instantiate, make sure history is still there
+    journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+    
+    // The storage info should be read, even if no writer has taken over.
+    assertEquals(storageString,
+        journal.getStorage().toColonSeparatedString());
+
+    assertEquals(1, journal.getLastPromisedEpoch());
+    NewEpochResponseProtoOrBuilder newEpoch = journal.newEpoch(FAKE_NSINFO, 2);
+    assertEquals(1, newEpoch.getLastSegmentTxId());
+  }
+  
+  @Test
+  public void testFormatResetsCachedValues() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 12345L);
+    journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L);
+
+    assertEquals(12345L, journal.getLastPromisedEpoch());
+    assertEquals(12345L, journal.getLastWriterEpoch());
+    assertTrue(journal.isFormatted());
+    
+    journal.format(FAKE_NSINFO_2);
+    
+    assertEquals(0, journal.getLastPromisedEpoch());
+    assertEquals(0, journal.getLastWriterEpoch());
+    assertTrue(journal.isFormatted());
+  }
+  
+  /**
+   * Test that, if the writer crashes at the very beginning of a segment,
+   * before any transactions are written, that the next newEpoch() call
+   * returns the prior segment txid as its most recent segment.
+   */
+  @Test
+  public void testNewEpochAtBeginningOfSegment() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    journal.startLogSegment(makeRI(1), 1);
+    journal.journal(makeRI(2), 1, 1, 2, 
+        QJMTestUtil.createTxnData(1, 2));
+    journal.finalizeLogSegment(makeRI(3), 1, 2);
+    journal.startLogSegment(makeRI(4), 3);
+    NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2);
+    assertEquals(1, resp.getLastSegmentTxId());
+  }
+  
+  @Test
+  public void testJournalLocking() throws Exception {
+    Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
+    StorageDirectory sd = journal.getStorage().getStorageDir(0);
+    File lockFile = new File(sd.getRoot(), Storage.STORAGE_FILE_LOCK);
+    
+    // Journal should be locked, since the format() call locks it.
+    GenericTestUtils.assertExists(lockFile);
+
+    journal.newEpoch(FAKE_NSINFO,  1);
+    try {
+      new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+      fail("Did not fail to create another journal in same dir");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Cannot lock storage", ioe);
+    }
+    
+    journal.close();
+    
+    // Journal should no longer be locked after the close() call.
+    // Hence, should be able to create a new Journal in the same dir.
+    Journal journal2 = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+    journal2.newEpoch(FAKE_NSINFO, 2);
+  }
+  
+  /**
+   * Test finalizing a segment after some batch of edits were missed.
+   * This should fail, since we validate the log before finalization.
+   */
+  @Test
+  public void testFinalizeWhenEditsAreMissed() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    journal.startLogSegment(makeRI(1), 1);
+    journal.journal(makeRI(2), 1, 1, 3,
+        QJMTestUtil.createTxnData(1, 3));
+    
+    // Try to finalize up to txn 6, even though we only wrote up to txn 3.
+    try {
+      journal.finalizeLogSegment(makeRI(3), 1, 6);
+      fail("did not fail to finalize");
+    } catch (JournalOutOfSyncException e) {
+      GenericTestUtils.assertExceptionContains(
+          "but only written up to txid 3", e);
+    }
+    
+    // Check that, even if we re-construct the journal by scanning the
+    // disk, we don't allow finalizing incorrectly.
+    journal.close();
+    journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+    
+    try {
+      journal.finalizeLogSegment(makeRI(4), 1, 6);
+      fail("did not fail to finalize");
+    } catch (JournalOutOfSyncException e) {
+      GenericTestUtils.assertExceptionContains(
+          "disk only contains up to txid 3", e);
+    }
+  }
+  
+  /**
+   * Ensure that finalizing a segment which doesn't exist throws the
+   * appropriate exception.
+   */
+  @Test
+  public void testFinalizeMissingSegment() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    try {
+      journal.finalizeLogSegment(makeRI(1), 1000, 1001);
+      fail("did not fail to finalize");
+    } catch (JournalOutOfSyncException e) {
+      GenericTestUtils.assertExceptionContains(
+          "No log file to finalize at transaction ID 1000", e);
+    }
+  }
+
+  /**
+   * Assume that a client is writing to a journal, but loses its connection
+   * in the middle of a segment. Thus, any future journal() calls in that
+   * segment may fail, because some txns were missed while the connection was
+   * down.
+   *
+   * Eventually, the connection comes back, and the NN tries to start a new
+   * segment at a higher txid. This should abort the old one and succeed.
+   */
+  @Test
+  public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    
+    // Start a segment at txid 1, and write a batch of 3 txns.
+    journal.startLogSegment(makeRI(1), 1);
+    journal.journal(makeRI(2), 1, 1, 3,
+        QJMTestUtil.createTxnData(1, 3));
+
+    GenericTestUtils.assertExists(
+        journal.getStorage().getInProgressEditLog(1));
+    
+    // Try to start new segment at txid 6, this should abort old segment and
+    // then succeed, allowing us to write txid 6-9.
+    journal.startLogSegment(makeRI(3), 6);
+    journal.journal(makeRI(4), 6, 6, 3,
+        QJMTestUtil.createTxnData(6, 3));
+
+    // The old segment should *not* be finalized.
+    GenericTestUtils.assertExists(
+        journal.getStorage().getInProgressEditLog(1));
+    GenericTestUtils.assertExists(
+        journal.getStorage().getInProgressEditLog(6));
+  }
+  
+  /**
+   * Test behavior of startLogSegment() when a segment with the
+   * same transaction ID already exists.
+   */
+  @Test
+  public void testStartLogSegmentWhenAlreadyExists() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    
+    // Start a segment at txid 1, and write just 1 transaction. This
+    // would normally be the START_LOG_SEGMENT transaction.
+    journal.startLogSegment(makeRI(1), 1);
+    journal.journal(makeRI(2), 1, 1, 1,
+        QJMTestUtil.createTxnData(1, 1));
+    
+    // Try to start new segment at txid 1, this should succeed, because
+    // we are allowed to re-start a segment if we only ever had the
+    // START_LOG_SEGMENT transaction logged.
+    journal.startLogSegment(makeRI(3), 1);
+    journal.journal(makeRI(4), 1, 1, 1,
+        QJMTestUtil.createTxnData(1, 1));
+
+    // This time through, write more transactions afterwards, simulating
+    // real user transactions.
+    journal.journal(makeRI(5), 1, 2, 3,
+        QJMTestUtil.createTxnData(2, 3));
+
+    try {
+      journal.startLogSegment(makeRI(6), 1);
+      fail("Did not fail to start log segment which would overwrite " +
+          "an existing one");
+    } catch (IllegalStateException ise) {
+      GenericTestUtils.assertExceptionContains(
+          "seems to contain valid transactions", ise);
+    }
+    
+    journal.finalizeLogSegment(makeRI(7), 1, 4);
+    
+    // Ensure that we cannot overwrite a finalized segment
+    try {
+      journal.startLogSegment(makeRI(8), 1);
+      fail("Did not fail to start log segment which would overwrite " +
+          "an existing one");
+    } catch (IllegalStateException ise) {
+      GenericTestUtils.assertExceptionContains(
+          "have a finalized segment", ise);
+    }
+
+  }
+  
+  private static RequestInfo makeRI(int serial) {
+    return new RequestInfo(JID, 1, serial, 0);
+  }
+  
+  @Test
+  public void testNamespaceVerification() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+
+    try {
+      journal.newEpoch(FAKE_NSINFO_2, 2);
+      fail("Did not fail newEpoch() when namespaces mismatched");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Incompatible namespaceID", ioe);
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
+import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.server.Journal;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Ints;
+
+
+public class TestJournalNode {
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L);
+
+  private JournalNode jn;
+  private Journal journal; 
+  private Configuration conf = new Configuration();
+  private IPCLoggerChannel ch;
+  private String journalId;
+
+  static {
+    // Avoid an error when we double-initialize JvmMetrics
+    DefaultMetricsSystem.setMiniClusterMode(true);
+  }
+  
+  @Before
+  public void setup() throws Exception {
+    File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
+        File.separator + "TestJournalNode");
+    FileUtil.fullyDelete(editsDir);
+    
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+        editsDir.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
+        "0.0.0.0:0");
+    jn = new JournalNode();
+    jn.setConf(conf);
+    jn.start();
+    journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId();
+    journal = jn.getOrCreateJournal(journalId);
+    journal.format(FAKE_NSINFO);
+    
+    ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
+  }
+  
+  @After
+  public void teardown() throws Exception {
+    jn.stop(0);
+  }
+  
+  @Test
+  public void testJournal() throws Exception {
+    MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
+        journal.getMetricsForTests().getName());
+    MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
+    MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
+    MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
+
+    IPCLoggerChannel ch = new IPCLoggerChannel(
+        conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    ch.startLogSegment(1).get();
+    ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get();
+    
+    metrics = MetricsAsserts.getMetrics(
+        journal.getMetricsForTests().getName());
+    MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
+    MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
+    MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
+
+    ch.setCommittedTxId(100L);
+    ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
+
+    metrics = MetricsAsserts.getMetrics(
+        journal.getMetricsForTests().getName());
+    MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
+    MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
+    MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);
+
+  }
+  
+  
+  @Test
+  public void testReturnsSegmentInfoAtEpochTransition() throws Exception {
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    ch.startLogSegment(1).get();
+    ch.sendEdits(1L, 1, 2, QJMTestUtil.createTxnData(1, 2)).get();
+    
+    // Switch to a new epoch without closing earlier segment
+    NewEpochResponseProto response = ch.newEpoch(2).get();
+    ch.setEpoch(2);
+    assertEquals(1, response.getLastSegmentTxId());
+    
+    ch.finalizeLogSegment(1, 2).get();
+    
+    // Switch to a new epoch after just closing the earlier segment.
+    response = ch.newEpoch(3).get();
+    ch.setEpoch(3);
+    assertEquals(1, response.getLastSegmentTxId());
+    
+    // Start a segment but don't write anything, check newEpoch segment info
+    ch.startLogSegment(3).get();
+    response = ch.newEpoch(4).get();
+    ch.setEpoch(4);
+    // Because the new segment is empty, it is equivalent to not having
+    // started writing it. Hence, we should return the prior segment txid.
+    assertEquals(1, response.getLastSegmentTxId());
+  }
+  
+  @Test
+  public void testHttpServer() throws Exception {
+    InetSocketAddress addr = jn.getBoundHttpAddress();
+    assertTrue(addr.getPort() > 0);
+    
+    String urlRoot = "http://localhost:" + addr.getPort();
+    
+    // Check default servlets.
+    String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx"));
+    assertTrue("Bad contents: " + pageContents,
+        pageContents.contains(
+            "Hadoop:service=JournalNode,name=JvmMetrics"));
+    
+    // Check JSP page.
+    pageContents = DFSTestUtil.urlGet(
+        new URL(urlRoot + "/journalstatus.jsp"));
+    assertTrue(pageContents.contains("JournalNode"));
+
+    // Create some edits on server side
+    byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3);
+    IPCLoggerChannel ch = new IPCLoggerChannel(
+        conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    ch.startLogSegment(1).get();
+    ch.sendEdits(1L, 1, 3, EDITS_DATA).get();
+    ch.finalizeLogSegment(1, 3).get();
+
+    // Attempt to retrieve via HTTP, ensure we get the data back
+    // including the header we expected
+    byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot +
+        "/getJournal?segmentTxId=1&jid=" + journalId));
+    byte[] expected = Bytes.concat(
+            Ints.toByteArray(HdfsConstants.LAYOUT_VERSION),
+            EDITS_DATA);
+
+    assertArrayEquals(expected, retrievedViaHttp);
+    
+    // Attempt to fetch a non-existent file, check that we get an
+    // error status code
+    URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + journalId);
+    HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection();
+    try {
+      assertEquals(404, connection.getResponseCode());
+    } finally {
+      connection.disconnect();
+    }
+  }
+
+  /**
+   * Test that the JournalNode performs correctly as a Paxos
+   * <em>Acceptor</em> process.
+   */
+  @Test
+  public void testAcceptRecoveryBehavior() throws Exception {
+    // We need to run newEpoch() first, or else we have no way to distinguish
+    // different proposals for the same decision.
+    try {
+      ch.prepareRecovery(1L).get();
+      fail("Did not throw IllegalState when trying to run paxos without an epoch");
+    } catch (ExecutionException ise) {
+      GenericTestUtils.assertExceptionContains("bad epoch", ise);
+    }
+    
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    
+    // prepare() with no previously accepted value and no logs present
+    PrepareRecoveryResponseProto prep = ch.prepareRecovery(1L).get();
+    System.err.println("Prep: " + prep);
+    assertFalse(prep.hasAcceptedInEpoch());
+    assertFalse(prep.hasSegmentState());
+    
+    // Make a log segment, and prepare again -- this time should see the
+    // segment existing.
+    ch.startLogSegment(1L).get();
+    ch.sendEdits(1L, 1L, 1, QJMTestUtil.createTxnData(1, 1)).get();
+
+    prep = ch.prepareRecovery(1L).get();
+    System.err.println("Prep: " + prep);
+    assertFalse(prep.hasAcceptedInEpoch());
+    assertTrue(prep.hasSegmentState());
+    
+    // accept() should save the accepted value in persistent storage
+    ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
+
+    // So another prepare() call from a new epoch would return this value
+    ch.newEpoch(2);
+    ch.setEpoch(2);
+    prep = ch.prepareRecovery(1L).get();
+    assertEquals(1L, prep.getAcceptedInEpoch());
+    assertEquals(1L, prep.getSegmentState().getEndTxId());
+    
+    // A prepare() or accept() call from an earlier epoch should now be rejected
+    ch.setEpoch(1);
+    try {
+      ch.prepareRecovery(1L).get();
+      fail("prepare from earlier epoch not rejected");
+    } catch (ExecutionException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "epoch 1 is less than the last promised epoch 2",
+          ioe);
+    }
+    try {
+      ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
+      fail("accept from earlier epoch not rejected");
+    } catch (ExecutionException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "epoch 1 is less than the last promised epoch 2",
+          ioe);
+    }
+  }
+  
+  @Test
+  public void testFailToStartWithBadConfig() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, "non-absolute-path");
+    assertJNFailsToStart(conf, "should be an absolute path");
+    
+    // Existing file which is not a directory 
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, "/dev/null");
+    assertJNFailsToStart(conf, "is not a directory");
+    
+    // Directory which cannot be created
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, "/proc/does-not-exist");
+    assertJNFailsToStart(conf, "Could not create");
+
+  }
+
+  private static void assertJNFailsToStart(Configuration conf,
+      String errString) {
+    try {
+      JournalNode jn = new JournalNode();
+      jn.setConf(conf);
+      jn.start();
+    } catch (Exception e) {
+      GenericTestUtils.assertExceptionContains(errString, e);
+    }
+  }
+  
+  /**
+   * Simple test of how fast the code path is to write edits.
+   * This isn't a true unit test, but can be run manually to
+   * check performance.
+   * 
+   * At the time of development, this test ran in ~4sec on an
+   * SSD-enabled laptop (1.8ms/batch).
+   */
+  @Test(timeout=100000)
+  public void testPerformance() throws Exception {
+    doPerfTest(8192, 1024); // 8MB
+  }
+  
+  private void doPerfTest(int editsSize, int numEdits) throws Exception {
+    byte[] data = new byte[editsSize];
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    ch.startLogSegment(1).get();
+    
+    Stopwatch sw = new Stopwatch().start();
+    for (int i = 1; i < numEdits; i++) {
+      ch.sendEdits(1L, i, 1, data).get();
+    }
+    long time = sw.elapsedMillis();
+    
+    System.err.println("Wrote " + numEdits + " batches of " + editsSize +
+        " bytes in " + time + "ms");
+    float avgRtt = (float)time/(float)numEdits;
+    long throughput = ((long)numEdits * editsSize * 1000L)/time;
+    System.err.println("Time per batch: " + avgRtt + "ms");
+    System.err.println("Throughput: " + throughput + " bytes/sec");
+  }
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Wed Dec  5 19:22:17 2012
@@ -74,6 +74,7 @@ import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Assume;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -87,14 +88,6 @@ public class TestBlockToken {
   public static final Log LOG = LogFactory.getLog(TestBlockToken.class);
   private static final String ADDRESS = "0.0.0.0";
 
-  static final String SERVER_PRINCIPAL_KEY = "test.ipc.server.principal";
-  private static Configuration conf;
-  static {
-    conf = new Configuration();
-    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-    UserGroupInformation.setConfiguration(conf);
-  }
-
   static {
     ((Log4JLogger) Client.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger) Server.LOG).getLogger().setLevel(Level.ALL);
@@ -111,6 +104,13 @@ public class TestBlockToken {
   ExtendedBlock block1 = new ExtendedBlock("0", 0L);
   ExtendedBlock block2 = new ExtendedBlock("10", 10L);
   ExtendedBlock block3 = new ExtendedBlock("-10", -108L);
+  
+  @Before
+  public void disableKerberos() {
+    Configuration conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "simple");
+    UserGroupInformation.setConfiguration(conf);
+  }
 
   private static class GetLengthAnswer implements
       Answer<GetReplicaVisibleLengthResponseProto> {
@@ -215,8 +215,9 @@ public class TestBlockToken {
     tokenGenerationAndVerification(masterHandler, slaveHandler);
   }
 
-  private Server createMockDatanode(BlockTokenSecretManager sm,
-      Token<BlockTokenIdentifier> token) throws IOException, ServiceException {
+  private static Server createMockDatanode(BlockTokenSecretManager sm,
+      Token<BlockTokenIdentifier> token, Configuration conf)
+      throws IOException, ServiceException {
     ClientDatanodeProtocolPB mockDN = mock(ClientDatanodeProtocolPB.class);
 
     BlockTokenIdentifier id = sm.createIdentifier();
@@ -237,12 +238,16 @@ public class TestBlockToken {
 
   @Test
   public void testBlockTokenRpc() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
         blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
 
-    final Server server = createMockDatanode(sm, token);
+    final Server server = createMockDatanode(sm, token, conf);
 
     server.start();
 
@@ -271,13 +276,17 @@ public class TestBlockToken {
    */
   @Test
   public void testBlockTokenRpcLeak() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    
     Assume.assumeTrue(FD_DIR.exists());
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
         blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
 
-    final Server server = createMockDatanode(sm, token);
+    final Server server = createMockDatanode(sm, token, conf);
     server.start();
 
     final InetSocketAddress addr = NetUtils.getConnectAddress(server);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Dec  5 19:22:17 2012
@@ -17,11 +17,16 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -182,6 +187,15 @@ public class NameNodeAdapter {
     }
   }
   
+  public static FSEditLogOp createMkdirOp(String path) {
+    MkdirOp op = MkdirOp.getInstance(new FSEditLogOp.OpInstanceCache())
+      .setPath(path)
+      .setTimestamp(0)
+      .setPermissionStatus(new PermissionStatus(
+              "testuser", "testgroup", FsPermission.getDefault()));
+    return op;
+  }
+  
   /**
    * @return the number of blocks marked safe by safemode, or -1
    * if safemode is not running.

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Wed Dec  5 19:22:17 2012
@@ -1220,7 +1220,7 @@ public class TestEditLog {
       elfos.create();
       elfos.writeRaw(garbage, 0, garbage.length);
       elfos.setReadyToFlush();
-      elfos.flushAndSync();
+      elfos.flushAndSync(true);
       elfos.close();
       elfos = null;
       file = new File(TEST_LOG_NAME);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java Wed Dec  5 19:22:17 2012
@@ -55,7 +55,7 @@ public class TestEditLogFileOutputStream
   static void flushAndCheckLength(EditLogFileOutputStream elos,
       long expectedLength) throws IOException {
     elos.setReadyToFlush();
-    elos.flushAndSync();
+    elos.flushAndSync(true);
     assertEquals(expectedLength, elos.getFile().length());
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java Wed Dec  5 19:22:17 2012
@@ -74,7 +74,7 @@ public class TestNameNodeRecovery {
 
       elts.addTransactionsToLog(elfos, cache);
       elfos.setReadyToFlush();
-      elfos.flushAndSync();
+      elfos.flushAndSync(true);
       elfos.close();
       elfos = null;
       file = new File(TEST_LOG_NAME);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java Wed Dec  5 19:22:17 2012
@@ -166,13 +166,13 @@ public class TestInitializeSharedEdits {
   }
   
   @Test
-  public void testDontOverWriteExistingDir() {
+  public void testDontOverWriteExistingDir() throws IOException {
     assertFalse(NameNode.initializeSharedEdits(conf, false));
     assertTrue(NameNode.initializeSharedEdits(conf, false));
   }
   
   @Test
-  public void testInitializeSharedEditsConfiguresGenericConfKeys() {
+  public void testInitializeSharedEditsConfiguresGenericConfKeys() throws IOException {
     Configuration conf = new Configuration();
     conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1");
     conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestBestEffortLongFile {
+
+  private static final File FILE = new File(MiniDFSCluster.getBaseDirectory() +
+      File.separatorChar + "TestBestEffortLongFile");
+
+  @Before
+  public void cleanup() {
+    if (FILE.exists()) {
+      assertTrue(FILE.delete());
+    }
+    FILE.getParentFile().mkdirs();
+  }
+  
+  @Test
+  public void testGetSet() throws IOException {
+    BestEffortLongFile f = new BestEffortLongFile(FILE, 12345L);
+    try {
+      // Before the file exists, should return default.
+      assertEquals(12345L, f.get());
+      
+      // And first access should open it.
+      assertTrue(FILE.exists());
+  
+      Random r = new Random();
+      for (int i = 0; i < 100; i++) {
+        long newVal = r.nextLong();
+        // Changing the value should be reflected in the next get() call.
+        f.set(newVal);
+        assertEquals(newVal, f.get());
+        
+        // And should be reflected in a new instance (ie it actually got
+        // written to the file)
+        BestEffortLongFile f2 = new BestEffortLongFile(FILE, 999L);
+        try {
+          assertEquals(newVal, f2.get());
+        } finally {
+          IOUtils.closeStream(f2);
+        }
+      }
+    } finally {
+      IOUtils.closeStream(f);
+    }
+  }
+  
+  @Test
+  public void testTruncatedFileReturnsDefault() throws IOException {
+    assertTrue(FILE.createNewFile());
+    assertEquals(0, FILE.length());
+    BestEffortLongFile f = new BestEffortLongFile(FILE, 12345L);
+    try {
+      assertEquals(12345L, f.get());
+    } finally {
+      f.close();
+    }
+  }
+}