You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/12/05 20:22:25 UTC
svn commit: r1417596 [6/6] - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
dev-support/
src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/
src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java...
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
+import org.apache.hadoop.hdfs.qjournal.server.Journal;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestJournal {
+ private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+ 12345, "mycluster", "my-bp", 0L);
+ private static final NamespaceInfo FAKE_NSINFO_2 = new NamespaceInfo(
+ 6789, "mycluster", "my-bp", 0L);
+
+ private static final String JID = "test-journal";
+
+ private static final File TEST_LOG_DIR = new File(
+ new File(MiniDFSCluster.getBaseDirectory()), "TestJournal");
+
+ private StorageErrorReporter mockErrorReporter = Mockito.mock(
+ StorageErrorReporter.class);
+
+ private Journal journal;
+
+
+ @Before
+ public void setup() throws Exception {
+ FileUtil.fullyDelete(TEST_LOG_DIR);
+ journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+ journal.format(FAKE_NSINFO);
+ }
+
+ @After
+ public void verifyNoStorageErrors() throws Exception{
+ Mockito.verify(mockErrorReporter, Mockito.never())
+ .reportErrorOnFile(Mockito.<File>any());
+ }
+
+ @After
+ public void cleanup() {
+ IOUtils.closeStream(journal);
+ }
+
+ @Test
+ public void testEpochHandling() throws Exception {
+ assertEquals(0, journal.getLastPromisedEpoch());
+ NewEpochResponseProto newEpoch =
+ journal.newEpoch(FAKE_NSINFO, 1);
+ assertFalse(newEpoch.hasLastSegmentTxId());
+ assertEquals(1, journal.getLastPromisedEpoch());
+ journal.newEpoch(FAKE_NSINFO, 3);
+ assertFalse(newEpoch.hasLastSegmentTxId());
+ assertEquals(3, journal.getLastPromisedEpoch());
+ try {
+ journal.newEpoch(FAKE_NSINFO, 3);
+ fail("Should have failed to promise same epoch twice");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "Proposed epoch 3 <= last promise 3", ioe);
+ }
+ try {
+ journal.startLogSegment(makeRI(1), 12345L);
+ fail("Should have rejected call from prior epoch");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "epoch 1 is less than the last promised epoch 3", ioe);
+ }
+ try {
+ journal.journal(makeRI(1), 12345L, 100L, 0, new byte[0]);
+ fail("Should have rejected call from prior epoch");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "epoch 1 is less than the last promised epoch 3", ioe);
+ }
+ }
+
+ @Test
+ public void testMaintainCommittedTxId() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+ journal.startLogSegment(makeRI(1), 1);
+ // Send txids 1-3, with a request indicating only 0 committed
+ journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3,
+ QJMTestUtil.createTxnData(1, 3));
+ assertEquals(0, journal.getCommittedTxnIdForTests());
+
+ // Send 4-6, with request indicating that through 3 is committed.
+ journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3,
+ QJMTestUtil.createTxnData(4, 6));
+ assertEquals(3, journal.getCommittedTxnIdForTests());
+ }
+
+ @Test
+ public void testRestartJournal() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+ journal.startLogSegment(makeRI(1), 1);
+ journal.journal(makeRI(2), 1, 1, 2,
+ QJMTestUtil.createTxnData(1, 2));
+ // Don't finalize.
+
+ String storageString = journal.getStorage().toColonSeparatedString();
+ System.err.println("storage string: " + storageString);
+ journal.close(); // close to unlock the storage dir
+
+ // Now re-instantiate, make sure history is still there
+ journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+
+ // The storage info should be read, even if no writer has taken over.
+ assertEquals(storageString,
+ journal.getStorage().toColonSeparatedString());
+
+ assertEquals(1, journal.getLastPromisedEpoch());
+ NewEpochResponseProtoOrBuilder newEpoch = journal.newEpoch(FAKE_NSINFO, 2);
+ assertEquals(1, newEpoch.getLastSegmentTxId());
+ }
+
+ @Test
+ public void testFormatResetsCachedValues() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 12345L);
+ journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L);
+
+ assertEquals(12345L, journal.getLastPromisedEpoch());
+ assertEquals(12345L, journal.getLastWriterEpoch());
+ assertTrue(journal.isFormatted());
+
+ journal.format(FAKE_NSINFO_2);
+
+ assertEquals(0, journal.getLastPromisedEpoch());
+ assertEquals(0, journal.getLastWriterEpoch());
+ assertTrue(journal.isFormatted());
+ }
+
+ /**
+ * Test that, if the writer crashes at the very beginning of a segment,
+ * before any transactions are written, that the next newEpoch() call
+ * returns the prior segment txid as its most recent segment.
+ */
+ @Test
+ public void testNewEpochAtBeginningOfSegment() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+ journal.startLogSegment(makeRI(1), 1);
+ journal.journal(makeRI(2), 1, 1, 2,
+ QJMTestUtil.createTxnData(1, 2));
+ journal.finalizeLogSegment(makeRI(3), 1, 2);
+ journal.startLogSegment(makeRI(4), 3);
+ NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2);
+ assertEquals(1, resp.getLastSegmentTxId());
+ }
+
+ @Test
+ public void testJournalLocking() throws Exception {
+ Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
+ StorageDirectory sd = journal.getStorage().getStorageDir(0);
+ File lockFile = new File(sd.getRoot(), Storage.STORAGE_FILE_LOCK);
+
+ // Journal should be locked, since the format() call locks it.
+ GenericTestUtils.assertExists(lockFile);
+
+ journal.newEpoch(FAKE_NSINFO, 1);
+ try {
+ new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+ fail("Did not fail to create another journal in same dir");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "Cannot lock storage", ioe);
+ }
+
+ journal.close();
+
+ // Journal should no longer be locked after the close() call.
+ // Hence, should be able to create a new Journal in the same dir.
+ Journal journal2 = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+ journal2.newEpoch(FAKE_NSINFO, 2);
+ }
+
+ /**
+ * Test finalizing a segment after some batch of edits were missed.
+ * This should fail, since we validate the log before finalization.
+ */
+ @Test
+ public void testFinalizeWhenEditsAreMissed() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+ journal.startLogSegment(makeRI(1), 1);
+ journal.journal(makeRI(2), 1, 1, 3,
+ QJMTestUtil.createTxnData(1, 3));
+
+ // Try to finalize up to txn 6, even though we only wrote up to txn 3.
+ try {
+ journal.finalizeLogSegment(makeRI(3), 1, 6);
+ fail("did not fail to finalize");
+ } catch (JournalOutOfSyncException e) {
+ GenericTestUtils.assertExceptionContains(
+ "but only written up to txid 3", e);
+ }
+
+ // Check that, even if we re-construct the journal by scanning the
+ // disk, we don't allow finalizing incorrectly.
+ journal.close();
+ journal = new Journal(TEST_LOG_DIR, JID, mockErrorReporter);
+
+ try {
+ journal.finalizeLogSegment(makeRI(4), 1, 6);
+ fail("did not fail to finalize");
+ } catch (JournalOutOfSyncException e) {
+ GenericTestUtils.assertExceptionContains(
+ "disk only contains up to txid 3", e);
+ }
+ }
+
+ /**
+ * Ensure that finalizing a segment which doesn't exist throws the
+ * appropriate exception.
+ */
+ @Test
+ public void testFinalizeMissingSegment() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+ try {
+ journal.finalizeLogSegment(makeRI(1), 1000, 1001);
+ fail("did not fail to finalize");
+ } catch (JournalOutOfSyncException e) {
+ GenericTestUtils.assertExceptionContains(
+ "No log file to finalize at transaction ID 1000", e);
+ }
+ }
+
+ /**
+ * Assume that a client is writing to a journal, but loses its connection
+ * in the middle of a segment. Thus, any future journal() calls in that
+ * segment may fail, because some txns were missed while the connection was
+ * down.
+ *
+ * Eventually, the connection comes back, and the NN tries to start a new
+ * segment at a higher txid. This should abort the old one and succeed.
+ */
+ @Test
+ public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+
+ // Start a segment at txid 1, and write a batch of 3 txns.
+ journal.startLogSegment(makeRI(1), 1);
+ journal.journal(makeRI(2), 1, 1, 3,
+ QJMTestUtil.createTxnData(1, 3));
+
+ GenericTestUtils.assertExists(
+ journal.getStorage().getInProgressEditLog(1));
+
+ // Try to start new segment at txid 6, this should abort old segment and
+ // then succeed, allowing us to write txid 6-9.
+ journal.startLogSegment(makeRI(3), 6);
+ journal.journal(makeRI(4), 6, 6, 3,
+ QJMTestUtil.createTxnData(6, 3));
+
+ // The old segment should *not* be finalized.
+ GenericTestUtils.assertExists(
+ journal.getStorage().getInProgressEditLog(1));
+ GenericTestUtils.assertExists(
+ journal.getStorage().getInProgressEditLog(6));
+ }
+
+ /**
+ * Test behavior of startLogSegment() when a segment with the
+ * same transaction ID already exists.
+ */
+ @Test
+ public void testStartLogSegmentWhenAlreadyExists() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+
+ // Start a segment at txid 1, and write just 1 transaction. This
+ // would normally be the START_LOG_SEGMENT transaction.
+ journal.startLogSegment(makeRI(1), 1);
+ journal.journal(makeRI(2), 1, 1, 1,
+ QJMTestUtil.createTxnData(1, 1));
+
+ // Try to start new segment at txid 1, this should succeed, because
+ // we are allowed to re-start a segment if we only ever had the
+ // START_LOG_SEGMENT transaction logged.
+ journal.startLogSegment(makeRI(3), 1);
+ journal.journal(makeRI(4), 1, 1, 1,
+ QJMTestUtil.createTxnData(1, 1));
+
+ // This time through, write more transactions afterwards, simulating
+ // real user transactions.
+ journal.journal(makeRI(5), 1, 2, 3,
+ QJMTestUtil.createTxnData(2, 3));
+
+ try {
+ journal.startLogSegment(makeRI(6), 1);
+ fail("Did not fail to start log segment which would overwrite " +
+ "an existing one");
+ } catch (IllegalStateException ise) {
+ GenericTestUtils.assertExceptionContains(
+ "seems to contain valid transactions", ise);
+ }
+
+ journal.finalizeLogSegment(makeRI(7), 1, 4);
+
+ // Ensure that we cannot overwrite a finalized segment
+ try {
+ journal.startLogSegment(makeRI(8), 1);
+ fail("Did not fail to start log segment which would overwrite " +
+ "an existing one");
+ } catch (IllegalStateException ise) {
+ GenericTestUtils.assertExceptionContains(
+ "have a finalized segment", ise);
+ }
+
+ }
+
+ private static RequestInfo makeRI(int serial) {
+ return new RequestInfo(JID, 1, serial, 0);
+ }
+
+ @Test
+ public void testNamespaceVerification() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+
+ try {
+ journal.newEpoch(FAKE_NSINFO_2, 2);
+ fail("Did not fail newEpoch() when namespaces mismatched");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "Incompatible namespaceID", ioe);
+ }
+ }
+
+}
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
+import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.server.Journal;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Ints;
+
+
+public class TestJournalNode {
+ private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+ 12345, "mycluster", "my-bp", 0L);
+
+ private JournalNode jn;
+ private Journal journal;
+ private Configuration conf = new Configuration();
+ private IPCLoggerChannel ch;
+ private String journalId;
+
+ static {
+ // Avoid an error when we double-initialize JvmMetrics
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
+ File.separator + "TestJournalNode");
+ FileUtil.fullyDelete(editsDir);
+
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+ editsDir.getAbsolutePath());
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
+ "0.0.0.0:0");
+ jn = new JournalNode();
+ jn.setConf(conf);
+ jn.start();
+ journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId();
+ journal = jn.getOrCreateJournal(journalId);
+ journal.format(FAKE_NSINFO);
+
+ ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
+ }
+
+ @After
+ public void teardown() throws Exception {
+ jn.stop(0);
+ }
+
+ @Test
+ public void testJournal() throws Exception {
+ MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(
+ journal.getMetricsForTests().getName());
+ MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
+ MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
+ MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
+
+ IPCLoggerChannel ch = new IPCLoggerChannel(
+ conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
+ ch.newEpoch(1).get();
+ ch.setEpoch(1);
+ ch.startLogSegment(1).get();
+ ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get();
+
+ metrics = MetricsAsserts.getMetrics(
+ journal.getMetricsForTests().getName());
+ MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
+ MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
+ MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
+
+ ch.setCommittedTxId(100L);
+ ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
+
+ metrics = MetricsAsserts.getMetrics(
+ journal.getMetricsForTests().getName());
+ MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
+ MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
+ MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);
+
+ }
+
+
+ @Test
+ public void testReturnsSegmentInfoAtEpochTransition() throws Exception {
+ ch.newEpoch(1).get();
+ ch.setEpoch(1);
+ ch.startLogSegment(1).get();
+ ch.sendEdits(1L, 1, 2, QJMTestUtil.createTxnData(1, 2)).get();
+
+ // Switch to a new epoch without closing earlier segment
+ NewEpochResponseProto response = ch.newEpoch(2).get();
+ ch.setEpoch(2);
+ assertEquals(1, response.getLastSegmentTxId());
+
+ ch.finalizeLogSegment(1, 2).get();
+
+ // Switch to a new epoch after just closing the earlier segment.
+ response = ch.newEpoch(3).get();
+ ch.setEpoch(3);
+ assertEquals(1, response.getLastSegmentTxId());
+
+ // Start a segment but don't write anything, check newEpoch segment info
+ ch.startLogSegment(3).get();
+ response = ch.newEpoch(4).get();
+ ch.setEpoch(4);
+ // Because the new segment is empty, it is equivalent to not having
+ // started writing it. Hence, we should return the prior segment txid.
+ assertEquals(1, response.getLastSegmentTxId());
+ }
+
+ @Test
+ public void testHttpServer() throws Exception {
+ InetSocketAddress addr = jn.getBoundHttpAddress();
+ assertTrue(addr.getPort() > 0);
+
+ String urlRoot = "http://localhost:" + addr.getPort();
+
+ // Check default servlets.
+ String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx"));
+ assertTrue("Bad contents: " + pageContents,
+ pageContents.contains(
+ "Hadoop:service=JournalNode,name=JvmMetrics"));
+
+ // Check JSP page.
+ pageContents = DFSTestUtil.urlGet(
+ new URL(urlRoot + "/journalstatus.jsp"));
+ assertTrue(pageContents.contains("JournalNode"));
+
+ // Create some edits on server side
+ byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3);
+ IPCLoggerChannel ch = new IPCLoggerChannel(
+ conf, FAKE_NSINFO, journalId, jn.getBoundIpcAddress());
+ ch.newEpoch(1).get();
+ ch.setEpoch(1);
+ ch.startLogSegment(1).get();
+ ch.sendEdits(1L, 1, 3, EDITS_DATA).get();
+ ch.finalizeLogSegment(1, 3).get();
+
+ // Attempt to retrieve via HTTP, ensure we get the data back
+ // including the header we expected
+ byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot +
+ "/getJournal?segmentTxId=1&jid=" + journalId));
+ byte[] expected = Bytes.concat(
+ Ints.toByteArray(HdfsConstants.LAYOUT_VERSION),
+ EDITS_DATA);
+
+ assertArrayEquals(expected, retrievedViaHttp);
+
+ // Attempt to fetch a non-existent file, check that we get an
+ // error status code
+ URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + journalId);
+ HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection();
+ try {
+ assertEquals(404, connection.getResponseCode());
+ } finally {
+ connection.disconnect();
+ }
+ }
+
+ /**
+ * Test that the JournalNode performs correctly as a Paxos
+ * <em>Acceptor</em> process.
+ */
+ @Test
+ public void testAcceptRecoveryBehavior() throws Exception {
+ // We need to run newEpoch() first, or else we have no way to distinguish
+ // different proposals for the same decision.
+ try {
+ ch.prepareRecovery(1L).get();
+ fail("Did not throw IllegalState when trying to run paxos without an epoch");
+ } catch (ExecutionException ise) {
+ GenericTestUtils.assertExceptionContains("bad epoch", ise);
+ }
+
+ ch.newEpoch(1).get();
+ ch.setEpoch(1);
+
+ // prepare() with no previously accepted value and no logs present
+ PrepareRecoveryResponseProto prep = ch.prepareRecovery(1L).get();
+ System.err.println("Prep: " + prep);
+ assertFalse(prep.hasAcceptedInEpoch());
+ assertFalse(prep.hasSegmentState());
+
+ // Make a log segment, and prepare again -- this time should see the
+ // segment existing.
+ ch.startLogSegment(1L).get();
+ ch.sendEdits(1L, 1L, 1, QJMTestUtil.createTxnData(1, 1)).get();
+
+ prep = ch.prepareRecovery(1L).get();
+ System.err.println("Prep: " + prep);
+ assertFalse(prep.hasAcceptedInEpoch());
+ assertTrue(prep.hasSegmentState());
+
+ // accept() should save the accepted value in persistent storage
+ ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
+
+ // So another prepare() call from a new epoch would return this value
+ ch.newEpoch(2);
+ ch.setEpoch(2);
+ prep = ch.prepareRecovery(1L).get();
+ assertEquals(1L, prep.getAcceptedInEpoch());
+ assertEquals(1L, prep.getSegmentState().getEndTxId());
+
+ // A prepare() or accept() call from an earlier epoch should now be rejected
+ ch.setEpoch(1);
+ try {
+ ch.prepareRecovery(1L).get();
+ fail("prepare from earlier epoch not rejected");
+ } catch (ExecutionException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "epoch 1 is less than the last promised epoch 2",
+ ioe);
+ }
+ try {
+ ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
+ fail("accept from earlier epoch not rejected");
+ } catch (ExecutionException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "epoch 1 is less than the last promised epoch 2",
+ ioe);
+ }
+ }
+
+ @Test
+ public void testFailToStartWithBadConfig() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, "non-absolute-path");
+ assertJNFailsToStart(conf, "should be an absolute path");
+
+ // Existing file which is not a directory
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, "/dev/null");
+ assertJNFailsToStart(conf, "is not a directory");
+
+ // Directory which cannot be created
+ conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, "/proc/does-not-exist");
+ assertJNFailsToStart(conf, "Could not create");
+
+ }
+
+ private static void assertJNFailsToStart(Configuration conf,
+ String errString) {
+ try {
+ JournalNode jn = new JournalNode();
+ jn.setConf(conf);
+ jn.start();
+ } catch (Exception e) {
+ GenericTestUtils.assertExceptionContains(errString, e);
+ }
+ }
+
+ /**
+ * Simple test of how fast the code path is to write edits.
+ * This isn't a true unit test, but can be run manually to
+ * check performance.
+ *
+ * At the time of development, this test ran in ~4sec on an
+ * SSD-enabled laptop (1.8ms/batch).
+ */
+ @Test(timeout=100000)
+ public void testPerformance() throws Exception {
+ doPerfTest(8192, 1024); // 8MB
+ }
+
+ private void doPerfTest(int editsSize, int numEdits) throws Exception {
+ byte[] data = new byte[editsSize];
+ ch.newEpoch(1).get();
+ ch.setEpoch(1);
+ ch.startLogSegment(1).get();
+
+ Stopwatch sw = new Stopwatch().start();
+ for (int i = 1; i < numEdits; i++) {
+ ch.sendEdits(1L, i, 1, data).get();
+ }
+ long time = sw.elapsedMillis();
+
+ System.err.println("Wrote " + numEdits + " batches of " + editsSize +
+ " bytes in " + time + "ms");
+ float avgRtt = (float)time/(float)numEdits;
+ long throughput = ((long)numEdits * editsSize * 1000L)/time;
+ System.err.println("Time per batch: " + avgRtt + "ms");
+ System.err.println("Throughput: " + throughput + " bytes/sec");
+ }
+}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Wed Dec 5 19:22:17 2012
@@ -74,6 +74,7 @@ import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Assume;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -87,14 +88,6 @@ public class TestBlockToken {
public static final Log LOG = LogFactory.getLog(TestBlockToken.class);
private static final String ADDRESS = "0.0.0.0";
- static final String SERVER_PRINCIPAL_KEY = "test.ipc.server.principal";
- private static Configuration conf;
- static {
- conf = new Configuration();
- conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- UserGroupInformation.setConfiguration(conf);
- }
-
static {
((Log4JLogger) Client.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) Server.LOG).getLogger().setLevel(Level.ALL);
@@ -111,6 +104,13 @@ public class TestBlockToken {
ExtendedBlock block1 = new ExtendedBlock("0", 0L);
ExtendedBlock block2 = new ExtendedBlock("10", 10L);
ExtendedBlock block3 = new ExtendedBlock("-10", -108L);
+
+ @Before
+ public void disableKerberos() {
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "simple");
+ UserGroupInformation.setConfiguration(conf);
+ }
private static class GetLengthAnswer implements
Answer<GetReplicaVisibleLengthResponseProto> {
@@ -215,8 +215,9 @@ public class TestBlockToken {
tokenGenerationAndVerification(masterHandler, slaveHandler);
}
- private Server createMockDatanode(BlockTokenSecretManager sm,
- Token<BlockTokenIdentifier> token) throws IOException, ServiceException {
+ private static Server createMockDatanode(BlockTokenSecretManager sm,
+ Token<BlockTokenIdentifier> token, Configuration conf)
+ throws IOException, ServiceException {
ClientDatanodeProtocolPB mockDN = mock(ClientDatanodeProtocolPB.class);
BlockTokenIdentifier id = sm.createIdentifier();
@@ -237,12 +238,16 @@ public class TestBlockToken {
@Test
public void testBlockTokenRpc() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+
BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
- final Server server = createMockDatanode(sm, token);
+ final Server server = createMockDatanode(sm, token, conf);
server.start();
@@ -271,13 +276,17 @@ public class TestBlockToken {
*/
@Test
public void testBlockTokenRpcLeak() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+
Assume.assumeTrue(FD_DIR.exists());
BlockTokenSecretManager sm = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
Token<BlockTokenIdentifier> token = sm.generateToken(block3,
EnumSet.allOf(BlockTokenSecretManager.AccessMode.class));
- final Server server = createMockDatanode(sm, token);
+ final Server server = createMockDatanode(sm, token, conf);
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Wed Dec 5 19:22:17 2012
@@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+
import java.io.File;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -182,6 +187,15 @@ public class NameNodeAdapter {
}
}
+ public static FSEditLogOp createMkdirOp(String path) {
+ MkdirOp op = MkdirOp.getInstance(new FSEditLogOp.OpInstanceCache())
+ .setPath(path)
+ .setTimestamp(0)
+ .setPermissionStatus(new PermissionStatus(
+ "testuser", "testgroup", FsPermission.getDefault()));
+ return op;
+ }
+
/**
* @return the number of blocks marked safe by safemode, or -1
* if safemode is not running.
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Wed Dec 5 19:22:17 2012
@@ -1220,7 +1220,7 @@ public class TestEditLog {
elfos.create();
elfos.writeRaw(garbage, 0, garbage.length);
elfos.setReadyToFlush();
- elfos.flushAndSync();
+ elfos.flushAndSync(true);
elfos.close();
elfos = null;
file = new File(TEST_LOG_NAME);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java Wed Dec 5 19:22:17 2012
@@ -55,7 +55,7 @@ public class TestEditLogFileOutputStream
static void flushAndCheckLength(EditLogFileOutputStream elos,
long expectedLength) throws IOException {
elos.setReadyToFlush();
- elos.flushAndSync();
+ elos.flushAndSync(true);
assertEquals(expectedLength, elos.getFile().length());
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java Wed Dec 5 19:22:17 2012
@@ -74,7 +74,7 @@ public class TestNameNodeRecovery {
elts.addTransactionsToLog(elfos, cache);
elfos.setReadyToFlush();
- elfos.flushAndSync();
+ elfos.flushAndSync(true);
elfos.close();
elfos = null;
file = new File(TEST_LOG_NAME);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java Wed Dec 5 19:22:17 2012
@@ -166,13 +166,13 @@ public class TestInitializeSharedEdits {
}
@Test
- public void testDontOverWriteExistingDir() {
+ public void testDontOverWriteExistingDir() throws IOException {
assertFalse(NameNode.initializeSharedEdits(conf, false));
assertTrue(NameNode.initializeSharedEdits(conf, false));
}
@Test
- public void testInitializeSharedEditsConfiguresGenericConfKeys() {
+ public void testInitializeSharedEditsConfiguresGenericConfKeys() throws IOException {
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1");
conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java Wed Dec 5 19:22:17 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestBestEffortLongFile {
+
+ private static final File FILE = new File(MiniDFSCluster.getBaseDirectory() +
+ File.separatorChar + "TestBestEffortLongFile");
+
+ @Before
+ public void cleanup() {
+ if (FILE.exists()) {
+ assertTrue(FILE.delete());
+ }
+ FILE.getParentFile().mkdirs();
+ }
+
+ @Test
+ public void testGetSet() throws IOException {
+ BestEffortLongFile f = new BestEffortLongFile(FILE, 12345L);
+ try {
+ // Before the file exists, should return default.
+ assertEquals(12345L, f.get());
+
+ // And first access should open it.
+ assertTrue(FILE.exists());
+
+ Random r = new Random();
+ for (int i = 0; i < 100; i++) {
+ long newVal = r.nextLong();
+ // Changing the value should be reflected in the next get() call.
+ f.set(newVal);
+ assertEquals(newVal, f.get());
+
+ // And should be reflected in a new instance (ie it actually got
+ // written to the file)
+ BestEffortLongFile f2 = new BestEffortLongFile(FILE, 999L);
+ try {
+ assertEquals(newVal, f2.get());
+ } finally {
+ IOUtils.closeStream(f2);
+ }
+ }
+ } finally {
+ IOUtils.closeStream(f);
+ }
+ }
+
+ @Test
+ public void testTruncatedFileReturnsDefault() throws IOException {
+ assertTrue(FILE.createNewFile());
+ assertEquals(0, FILE.length());
+ BestEffortLongFile f = new BestEffortLongFile(FILE, 12345L);
+ try {
+ assertEquals(12345L, f.get());
+ } finally {
+ f.close();
+ }
+ }
+}