You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/06/06 02:18:04 UTC
svn commit: r1346682 [2/9] - in
/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/
hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/contrib/bkjournal/
ha...
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Wed Jun 6 00:17:38 2012
@@ -18,53 +18,31 @@
package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*;
-
-import java.net.URI;
-import java.util.Collections;
-import java.util.Arrays;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.util.LocalBookKeeper;
-
-import java.io.RandomAccessFile;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.security.SecurityUtil;
+import static org.mockito.Mockito.spy;
import org.junit.Test;
import org.junit.Before;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.AfterClass;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.zip.CheckedInputStream;
-import java.util.zip.Checksum;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -73,79 +51,26 @@ public class TestBookKeeperJournalManage
static final Log LOG = LogFactory.getLog(TestBookKeeperJournalManager.class);
private static final long DEFAULT_SEGMENT_SIZE = 1000;
- private static final String zkEnsemble = "localhost:2181";
- private static Thread bkthread;
protected static Configuration conf = new Configuration();
private ZooKeeper zkc;
-
- private static ZooKeeper connectZooKeeper(String ensemble)
- throws IOException, KeeperException, InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
-
- ZooKeeper zkc = new ZooKeeper(zkEnsemble, 3600, new Watcher() {
- public void process(WatchedEvent event) {
- if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
- latch.countDown();
- }
- }
- });
- if (!latch.await(3, TimeUnit.SECONDS)) {
- throw new IOException("Zookeeper took too long to connect");
- }
- return zkc;
- }
+ private static BKJMUtil bkutil;
+ static int numBookies = 3;
@BeforeClass
public static void setupBookkeeper() throws Exception {
- final int numBookies = 5;
- bkthread = new Thread() {
- public void run() {
- try {
- String[] args = new String[1];
- args[0] = String.valueOf(numBookies);
- LOG.info("Starting bk");
- LocalBookKeeper.main(args);
- } catch (InterruptedException e) {
- // go away quietly
- } catch (Exception e) {
- LOG.error("Error starting local bk", e);
- }
- }
- };
- bkthread.start();
-
- if (!LocalBookKeeper.waitForServerUp(zkEnsemble, 10000)) {
- throw new Exception("Error starting zookeeper/bookkeeper");
- }
+ bkutil = new BKJMUtil(numBookies);
+ bkutil.start();
+ }
- ZooKeeper zkc = connectZooKeeper(zkEnsemble);
- try {
- boolean up = false;
- for (int i = 0; i < 10; i++) {
- try {
- List<String> children = zkc.getChildren("/ledgers/available",
- false);
- if (children.size() == numBookies) {
- up = true;
- break;
- }
- } catch (KeeperException e) {
- // ignore
- }
- Thread.sleep(1000);
- }
- if (!up) {
- throw new IOException("Not enough bookies started");
- }
- } finally {
- zkc.close();
- }
+ @AfterClass
+ public static void teardownBookkeeper() throws Exception {
+ bkutil.teardown();
}
-
+
@Before
public void setup() throws Exception {
- zkc = connectZooKeeper(zkEnsemble);
+ zkc = BKJMUtil.connectZooKeeper();
}
@After
@@ -153,19 +78,10 @@ public class TestBookKeeperJournalManage
zkc.close();
}
- @AfterClass
- public static void teardownBookkeeper() throws Exception {
- if (bkthread != null) {
- bkthread.interrupt();
- bkthread.join();
- }
- }
-
@Test
public void testSimpleWrite() throws Exception {
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplewrite"));
- long txid = 1;
+ BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -178,14 +94,13 @@ public class TestBookKeeperJournalManage
String zkpath = bkjm.finalizedLedgerZNode(1, 100);
assertNotNull(zkc.exists(zkpath, false));
- assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+ assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
}
@Test
public void testNumberOfTransactions() throws Exception {
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-txncount"));
- long txid = 1;
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -201,8 +116,8 @@ public class TestBookKeeperJournalManage
@Test
public void testNumberOfTransactionsWithGaps() throws Exception {
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-gaps"));
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-gaps"));
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
@@ -214,9 +129,11 @@ public class TestBookKeeperJournalManage
}
out.close();
bkjm.finalizeLogSegment(start, txid-1);
- assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
+ assertNotNull(
+ zkc.exists(bkjm.finalizedLedgerZNode(start, txid-1), false));
}
- zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1, DEFAULT_SEGMENT_SIZE*2), -1);
+ zkc.delete(bkjm.finalizedLedgerZNode(DEFAULT_SEGMENT_SIZE+1,
+ DEFAULT_SEGMENT_SIZE*2), -1);
long numTrans = bkjm.getNumberOfTransactions(1, true);
assertEquals(DEFAULT_SEGMENT_SIZE, numTrans);
@@ -234,8 +151,8 @@ public class TestBookKeeperJournalManage
@Test
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-inprogressAtEnd"));
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"));
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
@@ -248,7 +165,8 @@ public class TestBookKeeperJournalManage
out.close();
bkjm.finalizeLogSegment(start, (txid-1));
- assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
+ assertNotNull(
+ zkc.exists(bkjm.finalizedLedgerZNode(start, (txid-1)), false));
}
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(start);
@@ -272,8 +190,8 @@ public class TestBookKeeperJournalManage
*/
@Test
public void testWriteRestartFrom1() throws Exception {
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-restartFrom1"));
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"));
long txid = 1;
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(txid);
@@ -327,25 +245,26 @@ public class TestBookKeeperJournalManage
@Test
public void testTwoWriters() throws Exception {
long start = 1;
- BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
- BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-dualWriter"));
+ BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
+ BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
try {
- EditLogOutputStream out2 = bkjm2.startLogSegment(start);
+ bkjm2.startLogSegment(start);
fail("Shouldn't have been able to open the second writer");
} catch (IOException ioe) {
LOG.info("Caught exception as expected", ioe);
+ }finally{
+ out1.close();
}
}
@Test
public void testSimpleRead() throws Exception {
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simpleread"));
- long txid = 1;
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= numTransactions; i++) {
@@ -368,10 +287,9 @@ public class TestBookKeeperJournalManage
@Test
public void testSimpleRecovery() throws Exception {
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- URI.create("bookkeeper://" + zkEnsemble + "/hdfsjournal-simplerecovery"));
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
EditLogOutputStream out = bkjm.startLogSegment(1);
- long txid = 1;
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
op.setTransactionId(i);
@@ -385,11 +303,372 @@ public class TestBookKeeperJournalManage
assertNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
- assertNotNull(zkc.exists(bkjm.inprogressZNode(), false));
+ assertNotNull(zkc.exists(bkjm.inprogressZNode(1), false));
bkjm.recoverUnfinalizedSegments();
assertNotNull(zkc.exists(bkjm.finalizedLedgerZNode(1, 100), false));
- assertNull(zkc.exists(bkjm.inprogressZNode(), false));
+ assertNull(zkc.exists(bkjm.inprogressZNode(1), false));
+ }
+
+ /**
+ * Test that if enough bookies fail to prevent an ensemble,
+ * writes the bookkeeper will fail. Test that when once again
+ * an ensemble is available, it can continue to write.
+ */
+ @Test
+ public void testAllBookieFailure() throws Exception {
+ BookieServer bookieToFail = bkutil.newBookie();
+ BookieServer replacementBookie = null;
+
+ try {
+ int ensembleSize = numBookies + 1;
+ assertEquals("New bookie didn't start",
+ ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
+
+ // ensure that the journal manager has to use all bookies,
+ // so that a failure will fail the journal manager
+ Configuration conf = new Configuration();
+ conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+ ensembleSize);
+ conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+ ensembleSize);
+ long txid = 1;
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"));
+ EditLogOutputStream out = bkjm.startLogSegment(txid);
+
+ for (long i = 1 ; i <= 3; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+ out.setReadyToFlush();
+ out.flush();
+ bookieToFail.shutdown();
+ assertEquals("New bookie didn't die",
+ numBookies, bkutil.checkBookiesUp(numBookies, 10));
+
+ try {
+ for (long i = 1 ; i <= 3; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+ out.setReadyToFlush();
+ out.flush();
+ fail("should not get to this stage");
+ } catch (IOException ioe) {
+ LOG.debug("Error writing to bookkeeper", ioe);
+ assertTrue("Invalid exception message",
+ ioe.getMessage().contains("Failed to write to bookkeeper"));
+ }
+ replacementBookie = bkutil.newBookie();
+
+ assertEquals("New bookie didn't start",
+ numBookies+1, bkutil.checkBookiesUp(numBookies+1, 10));
+ bkjm.recoverUnfinalizedSegments();
+ out = bkjm.startLogSegment(txid);
+ for (long i = 1 ; i <= 3; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+
+ out.setReadyToFlush();
+ out.flush();
+
+ } catch (Exception e) {
+ LOG.error("Exception in test", e);
+ throw e;
+ } finally {
+ if (replacementBookie != null) {
+ replacementBookie.shutdown();
+ }
+ bookieToFail.shutdown();
+
+ if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
+ LOG.warn("Not all bookies from this test shut down, expect errors");
+ }
+ }
+ }
+
+ /**
+ * Test that a BookKeeper JM can continue to work across the
+ * failure of a bookie. This should be handled transparently
+ * by bookkeeper.
+ */
+ @Test
+ public void testOneBookieFailure() throws Exception {
+ BookieServer bookieToFail = bkutil.newBookie();
+ BookieServer replacementBookie = null;
+
+ try {
+ int ensembleSize = numBookies + 1;
+ assertEquals("New bookie didn't start",
+ ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
+
+ // ensure that the journal manager has to use all bookies,
+ // so that a failure will fail the journal manager
+ Configuration conf = new Configuration();
+ conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
+ ensembleSize);
+ conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
+ ensembleSize);
+ long txid = 1;
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
+ BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"));
+ EditLogOutputStream out = bkjm.startLogSegment(txid);
+ for (long i = 1 ; i <= 3; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+ out.setReadyToFlush();
+ out.flush();
+
+ replacementBookie = bkutil.newBookie();
+ assertEquals("replacement bookie didn't start",
+ ensembleSize+1, bkutil.checkBookiesUp(ensembleSize+1, 10));
+ bookieToFail.shutdown();
+ assertEquals("New bookie didn't die",
+ ensembleSize, bkutil.checkBookiesUp(ensembleSize, 10));
+
+ for (long i = 1 ; i <= 3; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(txid++);
+ out.write(op);
+ }
+ out.setReadyToFlush();
+ out.flush();
+ } catch (Exception e) {
+ LOG.error("Exception in test", e);
+ throw e;
+ } finally {
+ if (replacementBookie != null) {
+ replacementBookie.shutdown();
+ }
+ bookieToFail.shutdown();
+
+ if (bkutil.checkBookiesUp(numBookies, 30) != numBookies) {
+ LOG.warn("Not all bookies from this test shut down, expect errors");
+ }
+ }
+ }
+
+ /**
+ * If a journal manager has an empty inprogress node, ensure that we throw an
+ * error, as this should not be possible, and some third party has corrupted
+ * the zookeeper state
+ */
+ @Test
+ public void testEmptyInprogressNode() throws Exception {
+ URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, 100);
+
+ out = bkjm.startLogSegment(101);
+ out.close();
+ bkjm.close();
+ String inprogressZNode = bkjm.inprogressZNode(101);
+ zkc.setData(inprogressZNode, new byte[0], -1);
+
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ try {
+ bkjm.recoverUnfinalizedSegments();
+ fail("Should have failed. There should be no way of creating"
+ + " an empty inprogess znode");
+ } catch (IOException e) {
+ // correct behaviour
+ assertTrue("Exception different than expected", e.getMessage().contains(
+ "Invalid ledger entry,"));
+ } finally {
+ bkjm.close();
+ }
+ }
+
+ /**
+ * If a journal manager has an corrupt inprogress node, ensure that we throw
+ * an error, as this should not be possible, and some third party has
+ * corrupted the zookeeper state
+ */
+ @Test
+ public void testCorruptInprogressNode() throws Exception {
+ URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, 100);
+
+ out = bkjm.startLogSegment(101);
+ out.close();
+ bkjm.close();
+
+ String inprogressZNode = bkjm.inprogressZNode(101);
+ zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
+
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ try {
+ bkjm.recoverUnfinalizedSegments();
+ fail("Should have failed. There should be no way of creating"
+ + " an empty inprogess znode");
+ } catch (IOException e) {
+ // correct behaviour
+ assertTrue("Exception different than expected", e.getMessage().contains(
+ "Invalid ledger entry,"));
+
+ } finally {
+ bkjm.close();
+ }
+ }
+
+ /**
+ * Cases can occur where we create a segment but crash before we even have the
+ * chance to write the START_SEGMENT op. If this occurs we should warn, but
+ * load as normal
+ */
+ @Test
+ public void testEmptyInprogressLedger() throws Exception {
+ URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(1, 100);
+
+ out = bkjm.startLogSegment(101);
+ out.close();
+ bkjm.close();
+
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm.recoverUnfinalizedSegments();
+ out = bkjm.startLogSegment(101);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.finalizeLogSegment(101, 200);
+
+ bkjm.close();
+ }
+
+ /**
+ * Test that if we fail between finalizing an inprogress and deleting the
+ * corresponding inprogress znode.
+ */
+ @Test
+ public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
+ URI uri = BKJMUtil
+ .createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+
+ EditLogOutputStream out = bkjm.startLogSegment(1);
+ for (long i = 1; i <= 100; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ bkjm.close();
+
+ String inprogressZNode = bkjm.inprogressZNode(1);
+ String finalizedZNode = bkjm.finalizedLedgerZNode(1, 100);
+ assertNotNull("inprogress znode doesn't exist", zkc.exists(inprogressZNode,
+ null));
+ assertNull("finalized znode exists", zkc.exists(finalizedZNode, null));
+
+ byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
+
+ // finalize
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm.recoverUnfinalizedSegments();
+ bkjm.close();
+
+ assertNull("inprogress znode exists", zkc.exists(inprogressZNode, null));
+ assertNotNull("finalized znode doesn't exist", zkc.exists(finalizedZNode,
+ null));
+
+ zkc.create(inprogressZNode, inprogressData, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ // should work fine
+ bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm.recoverUnfinalizedSegments();
+ bkjm.close();
+ }
+
+ /**
+ * Tests that the edit log file meta data reading from ZooKeeper should be
+ * able to handle the NoNodeException. bkjm.getInputStream(fromTxId,
+ * inProgressOk) should suppress the NoNodeException and continue. HDFS-3441.
+ */
+ @Test
+ public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
+ URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+ try {
+ // start new inprogress log segment with txid=1
+ // and write transactions till txid=50
+ String zkpath1 = startAndFinalizeLogSegment(bkjm, 1, 50);
+
+ // start new inprogress log segment with txid=51
+ // and write transactions till txid=100
+ String zkpath2 = startAndFinalizeLogSegment(bkjm, 51, 100);
+
+ // read the metadata from ZK. Here simulating the situation
+ // when reading,the edit log metadata can be removed by purger thread.
+ ZooKeeper zkspy = spy(BKJMUtil.connectZooKeeper());
+ bkjm.setZooKeeper(zkspy);
+ Mockito.doThrow(
+ new KeeperException.NoNodeException(zkpath2 + " doesn't exists"))
+ .when(zkspy).getData(zkpath2, false, null);
+
+ List<EditLogLedgerMetadata> ledgerList = bkjm.getLedgerList(false);
+ assertEquals("List contains the metadata of non exists path.", 1,
+ ledgerList.size());
+ assertEquals("LogLedgerMetadata contains wrong zk paths.", zkpath1,
+ ledgerList.get(0).getZkPath());
+ } finally {
+ bkjm.close();
+ }
+ }
+
+ private String startAndFinalizeLogSegment(BookKeeperJournalManager bkjm,
+ int startTxid, int endTxid) throws IOException, KeeperException,
+ InterruptedException {
+ EditLogOutputStream out = bkjm.startLogSegment(startTxid);
+ for (long i = startTxid; i <= endTxid; i++) {
+ FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
+ op.setTransactionId(i);
+ out.write(op);
+ }
+ out.close();
+ // finalize the inprogress_1 log segment.
+ bkjm.finalizeLogSegment(startTxid, endTxid);
+ String zkpath1 = bkjm.finalizedLedgerZNode(startTxid, endTxid);
+ assertNotNull(zkc.exists(zkpath1, false));
+ assertNull(zkc.exists(bkjm.inprogressZNode(startTxid), false));
+ return zkpath1;
}
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java Wed Jun 6 00:17:38 2012
@@ -34,6 +34,11 @@ public class FSEditLogTestUtil {
public static long countTransactionsInStream(EditLogInputStream in)
throws IOException {
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
- return validation.getNumTransactions();
+ return (validation.getEndTxId() - in.getFirstTxId()) + 1;
+ }
+
+ public static void setRuntimeForEditLog(NameNode nn, Runtime rt) {
+ nn.setRuntimeForTesting(rt);
+ nn.getFSImage().getEditLog().setRuntimeForTesting(rt);
}
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/pom.xml Wed Jun 6 00:17:38 2012
@@ -14,7 +14,10 @@
-->
-<project>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/contrib/fuse-dfs/src/fuse_impls_truncate.c Wed Jun 6 00:17:38 2012
@@ -37,7 +37,7 @@ int dfs_truncate(const char *path, off_t
assert(dfs);
if (size != 0) {
- return -ENOTSUP;
+ return 0;
}
int ret = dfs_unlink(path);
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Wed Jun 6 00:17:38 2012
@@ -30,6 +30,7 @@ function print_usage(){
echo " namenode -format format the DFS filesystem"
echo " secondarynamenode run the DFS secondary namenode"
echo " namenode run the DFS namenode"
+ echo " zkfc run the ZK Failover Controller daemon"
echo " datanode run a DFS datanode"
echo " dfsadmin run a DFS admin client"
echo " haadmin run a DFS HA admin client"
@@ -56,21 +57,29 @@ shift
# Determine if we're starting a secure datanode, and if so, redefine appropriate variables
if [ "$COMMAND" == "datanode" ] && [ "$EUID" -eq 0 ] && [ -n "$HADOOP_SECURE_DN_USER" ]; then
- if [ -n "$HADOOP_SECURE_DN_PID_DIR" ]; then
- HADOOP_PID_DIR=$HADOOP_SECURE_DN_PID_DIR
- fi
-
- if [ -n "$HADOOP_SECURE_DN_LOG_DIR" ]; then
- HADOOP_LOG_DIR=$HADOOP_SECURE_DN_LOG_DIR
+ if [ -n "$JSVC_HOME" ]; then
+ if [ -n "$HADOOP_SECURE_DN_PID_DIR" ]; then
+ HADOOP_PID_DIR=$HADOOP_SECURE_DN_PID_DIR
+ fi
+
+ if [ -n "$HADOOP_SECURE_DN_LOG_DIR" ]; then
+ HADOOP_LOG_DIR=$HADOOP_SECURE_DN_LOG_DIR
+ fi
+
+ HADOOP_IDENT_STRING=$HADOOP_SECURE_DN_USER
+ starting_secure_dn="true"
+ else
+ echo "It looks like you're trying to start a secure DN, but \$JSVC_HOME"\
+ "isn't set. Falling back to starting insecure DN."
fi
-
- HADOOP_IDENT_STRING=$HADOOP_SECURE_DN_USER
- starting_secure_dn="true"
fi
if [ "$COMMAND" = "namenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
+elif [ "$COMMAND" = "zkfc" ] ; then
+ CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
elif [ "$COMMAND" = "secondarynamenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS"
@@ -125,12 +134,12 @@ if [ "$starting_secure_dn" = "true" ]; t
if [ "$HADOOP_PID_DIR" = "" ]; then
HADOOP_SECURE_DN_PID="/tmp/hadoop_secure_dn.pid"
else
- HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid"
+ HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid"
fi
JSVC=$JSVC_HOME/jsvc
if [ ! -f $JSVC ]; then
- echo "JSVC_HOME is not set correctly so jsvc can not be found. Jsvc is required to run secure datanodes. "
+ echo "JSVC_HOME is not set correctly so jsvc cannot be found. Jsvc is required to run secure datanodes. "
echo "Please download and install jsvc from http://archive.apache.org/dist/commons/daemon/binaries/ "\
"and set JSVC_HOME to the directory containing the jsvc binary."
exit
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh Wed Jun 6 00:17:38 2012
@@ -85,4 +85,15 @@ if [ -n "$SECONDARY_NAMENODES" ]; then
--script "$bin/hdfs" start secondarynamenode
fi
+#---------------------------------------------------------
+# ZK Failover controllers, if auto-HA is enabled
+AUTOHA_ENABLED=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.ha.automatic-failover.enabled)
+if [ "$(echo "$AUTOHA_ENABLED" | tr A-Z a-z)" = "true" ]; then
+ echo "Starting ZK Failover Controllers on NN hosts [$NAMENODES]"
+ "$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
+ --config "$HADOOP_CONF_DIR" \
+ --hostnames "$NAMENODES" \
+ --script "$bin/hdfs" start zkfc
+fi
+
# eof
Propchange: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-3042/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1306184-1342109
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1337003-1346681
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Jun 6 00:17:38 2012
@@ -107,6 +107,8 @@ public class DFSConfigKeys extends Commo
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000;
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
public static final int DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
+ public static final String DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY = "dfs.namenode.tolerate.heartbeat.multiplier";
+ public static final int DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT = 4;
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY = "dfs.client.https.keystore.resource";
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
@@ -334,8 +336,8 @@ public class DFSConfigKeys extends Commo
public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
- public static final String DFS_FEDERATION_NAMESERVICES = "dfs.federation.nameservices";
- public static final String DFS_FEDERATION_NAMESERVICE_ID = "dfs.federation.nameservice.id";
+ public static final String DFS_NAMESERVICES = "dfs.nameservices";
+ public static final String DFS_NAMESERVICE_ID = "dfs.nameservice.id";
public static final String DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY = "dfs.namenode.resource.check.interval";
public static final int DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT = 5000;
public static final String DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved";
@@ -358,4 +360,8 @@ public class DFSConfigKeys extends Commo
public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";
+ public static final String DFS_HA_AUTO_FAILOVER_ENABLED_KEY = "dfs.ha.automatic-failover.enabled";
+ public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
+ public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
+ public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Jun 6 00:17:38 2012
@@ -129,11 +129,13 @@ public class DFSOutputStream extends FSO
private long initialFileSize = 0; // at time of file open
private Progressable progress;
private final short blockReplication; // replication factor of file
+ private boolean shouldSyncBlock = false; // force blocks to disk upon close
private class Packet {
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // offset in block
- boolean lastPacketInBlock; // is this the last packet in block?
+ private boolean lastPacketInBlock; // is this the last packet in block?
+ boolean syncBlock; // this packet forces the current block to disk
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
@@ -245,7 +247,7 @@ public class DFSOutputStream extends FSO
buffer.mark();
PacketHeader header = new PacketHeader(
- pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
header.putInBuffer(buffer);
buffer.reset();
@@ -507,8 +509,15 @@ public class DFSOutputStream extends FSO
}
// write out data to remote datanode
- blockStream.write(buf.array(), buf.position(), buf.remaining());
- blockStream.flush();
+ try {
+ blockStream.write(buf.array(), buf.position(), buf.remaining());
+ blockStream.flush();
+ } catch (IOException e) {
+ // HDFS-3398 treat primary DN is down since client is unable to
+ // write to primary DN
+ errorIndex = 0;
+ throw e;
+ }
lastPacket = System.currentTimeMillis();
if (one.isHeartbeatPacket()) { //heartbeat packet
@@ -965,6 +974,7 @@ public class DFSOutputStream extends FSO
DatanodeInfo[] nodes = null;
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
+ ExtendedBlock oldBlock = block;
do {
hasError = false;
lastException = null;
@@ -972,9 +982,11 @@ public class DFSOutputStream extends FSO
success = false;
long startTime = System.currentTimeMillis();
- DatanodeInfo[] w = excludedNodes.toArray(
+ DatanodeInfo[] excluded = excludedNodes.toArray(
new DatanodeInfo[excludedNodes.size()]);
- lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
+ block = oldBlock;
+ lb = locateFollowingBlock(startTime,
+ excluded.length > 0 ? excluded : null);
block = lb.getBlock();
block.setNumBytes(0);
accessToken = lb.getBlockToken();
@@ -1239,6 +1251,7 @@ public class DFSOutputStream extends FSO
long blockSize, Progressable progress, int buffersize,
DataChecksum checksum) throws IOException {
this(dfsClient, src, blockSize, progress, checksum, replication);
+ this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
@@ -1421,6 +1434,7 @@ public class DFSOutputStream extends FSO
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
+ currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
@@ -1440,6 +1454,24 @@ public class DFSOutputStream extends FSO
*/
@Override
public void hflush() throws IOException {
+ flushOrSync(false);
+ }
+
+ /**
+ * The expected semantics is all data have flushed out to all replicas
+ * and all replicas have done posix fsync equivalent - ie the OS has
+ * flushed it to the disk device (but the disk may have it in its cache).
+ *
+ * Note that only the current block is flushed to the disk device.
+ * To guarantee durable sync across block boundaries the stream should
+ * be created with {@link CreateFlag#SYNC_BLOCK}.
+ */
+ @Override
+ public void hsync() throws IOException {
+ flushOrSync(true);
+ }
+
+ private void flushOrSync(boolean isSync) throws IOException {
dfsClient.checkOpen();
isClosed();
try {
@@ -1467,7 +1499,13 @@ public class DFSOutputStream extends FSO
assert bytesCurBlock > lastFlushOffset;
// record the valid offset of this flush
lastFlushOffset = bytesCurBlock;
- waitAndQueueCurrentPacket();
+ if (isSync && currentPacket == null) {
+ // Nothing to send right now,
+ // but sync was requested.
+ // Send an empty packet
+ currentPacket = new Packet(packetSize, chunksPerPacket,
+ bytesCurBlock);
+ }
} else {
// We already flushed up to this offset.
// This means that we haven't written anything since the last flush
@@ -1477,8 +1515,21 @@ public class DFSOutputStream extends FSO
assert oldCurrentPacket == null :
"Empty flush should not occur with a currentPacket";
- // just discard the current packet since it is already been sent.
- currentPacket = null;
+ if (isSync && bytesCurBlock > 0) {
+ // Nothing to send right now,
+ // and the block was partially written,
+ // and sync was requested.
+ // So send an empty sync packet.
+ currentPacket = new Packet(packetSize, chunksPerPacket,
+ bytesCurBlock);
+ } else {
+ // just discard the current packet since it is already been sent.
+ currentPacket = null;
+ }
+ }
+ if (currentPacket != null) {
+ currentPacket.syncBlock = isSync;
+ waitAndQueueCurrentPacket();
}
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
@@ -1530,18 +1581,6 @@ public class DFSOutputStream extends FSO
}
/**
- * The expected semantics is all data have flushed out to all replicas
- * and all replicas have done posix fsync equivalent - ie the OS has
- * flushed it to the disk device (but the disk may have it in its cache).
- *
- * Right now by default it is implemented as hflush
- */
- @Override
- public synchronized void hsync() throws IOException {
- hflush();
- }
-
- /**
* @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
*/
@Deprecated
@@ -1665,6 +1704,7 @@ public class DFSOutputStream extends FSO
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
+ currentPacket.syncBlock = shouldSyncBlock;
}
flushInternal(); // flush all data to Datanodes
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Jun 6 00:17:38 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.HadoopIllegalAr
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -288,7 +289,7 @@ public class DFSUtil {
* @return collection of nameservice Ids, or null if not specified
*/
public static Collection<String> getNameServiceIds(Configuration conf) {
- return conf.getTrimmedStringCollection(DFS_FEDERATION_NAMESERVICES);
+ return conf.getTrimmedStringCollection(DFS_NAMESERVICES);
}
/**
@@ -609,6 +610,14 @@ public class DFSUtil {
public static Collection<URI> getNameServiceUris(Configuration conf,
String... keys) {
Set<URI> ret = new HashSet<URI>();
+
+ // We're passed multiple possible configuration keys for any given NN or HA
+ // nameservice, and search the config in order of these keys. In order to
+ // make sure that a later config lookup (e.g. fs.defaultFS) doesn't add a
+ // URI for a config key for which we've already found a preferred entry, we
+ // keep track of non-preferred keys here.
+ Set<URI> nonPreferredUris = new HashSet<URI>();
+
for (String nsId : getNameServiceIds(conf)) {
if (HAUtil.isHAEnabled(conf, nsId)) {
// Add the logical URI of the nameservice.
@@ -619,24 +628,46 @@ public class DFSUtil {
}
} else {
// Add the URI corresponding to the address of the NN.
+ boolean uriFound = false;
for (String key : keys) {
String addr = conf.get(concatSuffixes(key, nsId));
if (addr != null) {
- ret.add(createUri(HdfsConstants.HDFS_URI_SCHEME,
- NetUtils.createSocketAddr(addr)));
- break;
+ URI uri = createUri(HdfsConstants.HDFS_URI_SCHEME,
+ NetUtils.createSocketAddr(addr));
+ if (!uriFound) {
+ uriFound = true;
+ ret.add(uri);
+ } else {
+ nonPreferredUris.add(uri);
+ }
}
}
}
}
+
// Add the generic configuration keys.
+ boolean uriFound = false;
for (String key : keys) {
String addr = conf.get(key);
if (addr != null) {
- ret.add(createUri("hdfs", NetUtils.createSocketAddr(addr)));
- break;
+ URI uri = createUri("hdfs", NetUtils.createSocketAddr(addr));
+ if (!uriFound) {
+ uriFound = true;
+ ret.add(uri);
+ } else {
+ nonPreferredUris.add(uri);
+ }
}
}
+
+ // Add the default URI if it is an HDFS URI.
+ URI defaultUri = FileSystem.getDefaultUri(conf);
+ if (defaultUri != null &&
+ HdfsConstants.HDFS_URI_SCHEME.equals(defaultUri.getScheme()) &&
+ !nonPreferredUris.contains(defaultUri)) {
+ ret.add(defaultUri);
+ }
+
return ret;
}
@@ -676,9 +707,10 @@ public class DFSUtil {
* @param httpsAddress -If true, and if security is enabled, returns server
* https address. If false, returns server http address.
* @return server http or https address
+ * @throws IOException
*/
- public static String getInfoServer(
- InetSocketAddress namenodeAddr, Configuration conf, boolean httpsAddress) {
+ public static String getInfoServer(InetSocketAddress namenodeAddr,
+ Configuration conf, boolean httpsAddress) throws IOException {
boolean securityOn = UserGroupInformation.isSecurityEnabled();
String httpAddressKey = (securityOn && httpsAddress) ?
DFS_NAMENODE_HTTPS_ADDRESS_KEY : DFS_NAMENODE_HTTP_ADDRESS_KEY;
@@ -695,8 +727,14 @@ public class DFSUtil {
} else {
suffixes = new String[2];
}
-
- return getSuffixedConf(conf, httpAddressKey, httpAddressDefault, suffixes);
+ String configuredInfoAddr = getSuffixedConf(conf, httpAddressKey,
+ httpAddressDefault, suffixes);
+ if (namenodeAddr != null) {
+ return substituteForWildcardAddress(configuredInfoAddr,
+ namenodeAddr.getHostName());
+ } else {
+ return configuredInfoAddr;
+ }
}
@@ -721,7 +759,7 @@ public class DFSUtil {
if (UserGroupInformation.isSecurityEnabled() &&
defaultSockAddr.getAddress().isAnyLocalAddress()) {
throw new IOException("Cannot use a wildcard address with security. " +
- "Must explicitly set bind address for Kerberos");
+ "Must explicitly set bind address for Kerberos");
}
return defaultHost + ":" + sockAddr.getPort();
} else {
@@ -843,7 +881,7 @@ public class DFSUtil {
* Get the nameservice Id by matching the {@code addressKey} with the
* the address of the local node.
*
- * If {@link DFSConfigKeys#DFS_FEDERATION_NAMESERVICE_ID} is not specifically
+ * If {@link DFSConfigKeys#DFS_NAMESERVICE_ID} is not specifically
* configured, and more than one nameservice Id is configured, this method
* determines the nameservice Id by matching the local node's address with the
* configured addresses. When a match is found, it returns the nameservice Id
@@ -855,7 +893,7 @@ public class DFSUtil {
* @throws HadoopIllegalArgumentException on error
*/
private static String getNameServiceId(Configuration conf, String addressKey) {
- String nameserviceId = conf.get(DFS_FEDERATION_NAMESERVICE_ID);
+ String nameserviceId = conf.get(DFS_NAMESERVICE_ID);
if (nameserviceId != null) {
return nameserviceId;
}
@@ -927,7 +965,7 @@ public class DFSUtil {
if (found > 1) { // Only one address must match the local address
String msg = "Configuration has multiple addresses that match "
+ "local node's address. Please configure the system with "
- + DFS_FEDERATION_NAMESERVICE_ID + " and "
+ + DFS_NAMESERVICE_ID + " and "
+ DFS_HA_NAMENODE_ID_KEY;
throw new HadoopIllegalArgumentException(msg);
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Jun 6 00:17:38 2012
@@ -223,12 +223,19 @@ public class DistributedFileSystem exten
@Override
public HdfsDataOutputStream create(Path f, FsPermission permission,
- boolean overwrite, int bufferSize, short replication, long blockSize,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return create(f, permission,
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+ blockSize, progress);
+ }
+
+ @Override
+ public HdfsDataOutputStream create(Path f, FsPermission permission,
+ EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
- final EnumSet<CreateFlag> cflags = overwrite?
- EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE);
final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
replication, blockSize, progress, bufferSize);
return new HdfsDataOutputStream(out, statistics);
@@ -249,6 +256,7 @@ public class DistributedFileSystem exten
/**
* Same as create(), except fails if parent directory doesn't already exist.
*/
+ @Override
public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flag, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java Wed Jun 6 00:17:38 2012
@@ -142,7 +142,7 @@ public class HAUtil {
Preconditions.checkArgument(nsId != null,
"Could not determine namespace id. Please ensure that this " +
"machine is one of the machines listed as a NN RPC address, " +
- "or configure " + DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID);
+ "or configure " + DFSConfigKeys.DFS_NAMESERVICE_ID);
Collection<String> nnIds = DFSUtil.getNameNodeIds(myConf, nsId);
String myNNId = myConf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Wed Jun 6 00:17:38 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -47,6 +48,8 @@ public class HDFSPolicyProvider extends
new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
HAServiceProtocol.class),
+ new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,
+ ZKFCProtocol.class),
new Service(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY,
RefreshAuthorizationPolicyProtocol.class),
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Wed Jun 6 00:17:38 2012
@@ -63,7 +63,7 @@ public class HdfsConfiguration extends C
}
private static void deprecate(String oldKey, String newKey) {
- Configuration.addDeprecation(oldKey, new String[]{newKey});
+ Configuration.addDeprecation(oldKey, newKey);
}
private static void addDeprecatedKeys() {
@@ -102,5 +102,7 @@ public class HdfsConfiguration extends C
deprecate("dfs.block.size", DFSConfigKeys.DFS_BLOCK_SIZE_KEY);
deprecate("dfs.datanode.max.xcievers", DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY);
deprecate("io.bytes.per.checksum", DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY);
+ deprecate("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES);
+ deprecate("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID);
}
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java Wed Jun 6 00:17:38 2012
@@ -214,6 +214,17 @@ public class Block implements Writable,
}
return compareTo((Block)o) == 0;
}
+
+ /**
+ * @return true if the two blocks have the same block ID and the same
+ * generation stamp, or if both blocks are null.
+ */
+ public static boolean matchingIdAndGenStamp(Block a, Block b) {
+ if (a == b) return true; // same block, or both null
+ if (a == null || b == null) return false; // only one null
+ return a.blockId == b.blockId &&
+ a.generationStamp == b.generationStamp;
+ }
@Override // Object
public int hashCode() {
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Jun 6 00:17:38 2012
@@ -309,6 +309,7 @@ public interface ClientProtocol {
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
+ @Idempotent
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes)
throws AccessControlException, FileNotFoundException,
@@ -362,6 +363,7 @@ public interface ClientProtocol {
* @throws UnresolvedLinkException If <code>src</code> contains a symlink
* @throws IOException If an I/O error occurred
*/
+ @Idempotent
public boolean complete(String src, String clientName, ExtendedBlock last)
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException;
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Wed Jun 6 00:17:38 2012
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
* This class represents the primary identifier for a Datanode.
@@ -45,23 +44,6 @@ public class DatanodeID implements Compa
protected int infoPort; // info server port
protected int ipcPort; // IPC server port
- public DatanodeID(String ipAddr, int xferPort) {
- this(ipAddr, "", "", xferPort,
- DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
- DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
- }
-
- public DatanodeID(String ipAddr, String hostName, int xferPort) {
- this(ipAddr, hostName, "", xferPort,
- DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
- DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT);
- }
-
- /**
- * DatanodeID copy constructor
- *
- * @param from
- */
public DatanodeID(DatanodeID from) {
this(from.getIpAddr(),
from.getHostName(),
@@ -72,7 +54,7 @@ public class DatanodeID implements Compa
}
/**
- * Create DatanodeID
+ * Create a DatanodeID
* @param ipAddr IP
* @param hostName hostname
* @param storageID data storage ID
@@ -94,22 +76,6 @@ public class DatanodeID implements Compa
this.ipAddr = ipAddr;
}
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- public void setXferPort(int xferPort) {
- this.xferPort = xferPort;
- }
-
- public void setInfoPort(int infoPort) {
- this.infoPort = infoPort;
- }
-
- public void setIpcPort(int ipcPort) {
- this.ipcPort = ipcPort;
- }
-
public void setStorageID(String storageID) {
this.storageID = storageID;
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed Jun 6 00:17:38 2012
@@ -22,11 +22,11 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
-/****************************************************
- * A LocatedBlock is a pair of Block, DatanodeInfo[]
- * objects. It tells where to find a Block.
- *
- ****************************************************/
+/**
+ * Associates a block with the Datanodes that contain its replicas
+ * and other block metadata (E.g. the file offset associated with this
+ * block, whether it is corrupt, security token, etc).
+ */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class LocatedBlock {
@@ -40,19 +40,6 @@ public class LocatedBlock {
private boolean corrupt;
private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
- public LocatedBlock() {
- this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
- }
-
-
- public LocatedBlock(ExtendedBlock eb) {
- this(eb, new DatanodeInfo[0], 0L, false);
- }
-
- public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
- this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown
- }
-
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
this(b, locs, -1, false); // startOffset is unknown
}
@@ -81,14 +68,10 @@ public class LocatedBlock {
this.blockToken = token;
}
- /**
- */
public ExtendedBlock getBlock() {
return b;
}
- /**
- */
public DatanodeInfo[] getLocations() {
return locs;
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Wed Jun 6 00:17:38 2012
@@ -105,8 +105,9 @@ public class LocatedBlocks {
* @return block if found, or null otherwise.
*/
public int findBlock(long offset) {
- // create fake block of size 1 as a key
- LocatedBlock key = new LocatedBlock();
+ // create fake block of size 0 as a key
+ LocatedBlock key = new LocatedBlock(
+ new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
key.setStartOffset(offset);
key.getBlock().setNumBytes(1);
Comparator<LocatedBlock> comp =
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Wed Jun 6 00:17:38 2012
@@ -40,6 +40,7 @@ public class PacketHeader {
.setSeqno(0)
.setLastPacketInBlock(false)
.setDataLen(0)
+ .setSyncBlock(false)
.build().getSerializedSize();
public static final int PKT_HEADER_LEN =
6 + PROTO_SIZE;
@@ -51,13 +52,14 @@ public class PacketHeader {
}
public PacketHeader(int packetLen, long offsetInBlock, long seqno,
- boolean lastPacketInBlock, int dataLen) {
+ boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
this.packetLen = packetLen;
proto = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
.setDataLen(dataLen)
+ .setSyncBlock(syncBlock)
.build();
}
@@ -81,6 +83,10 @@ public class PacketHeader {
return packetLen;
}
+ public boolean getSyncBlock() {
+ return proto.getSyncBlock();
+ }
+
@Override
public String toString() {
return "PacketHeader with packetLen=" + packetLen +
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/GetUserMappingsProtocolPB.java Wed Jun 6 00:17:38 2012
@@ -20,9 +20,13 @@ package org.apache.hadoop.hdfs.protocolP
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+@KerberosInfo(
+ serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(
protocolName = "org.apache.hadoop.tools.GetUserMappingsProtocol",
protocolVersion = 1)
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java Wed Jun 6 00:17:38 2012
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
@@ -104,6 +106,20 @@ public class NamenodeProtocolServerSideT
}
return GetTransactionIdResponseProto.newBuilder().setTxId(txid).build();
}
+
+ @Override
+ public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
+ RpcController unused, GetMostRecentCheckpointTxIdRequestProto request)
+ throws ServiceException {
+ long txid;
+ try {
+ txid = impl.getMostRecentCheckpointTxId();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return GetMostRecentCheckpointTxIdResponseProto.newBuilder().setTxId(txid).build();
+ }
+
@Override
public RollEditLogResponseProto rollEditLog(RpcController unused,
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java Wed Jun 6 00:17:38 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
@@ -120,6 +121,16 @@ public class NamenodeProtocolTranslatorP
}
@Override
+ public long getMostRecentCheckpointTxId() throws IOException {
+ try {
+ return rpcProxy.getMostRecentCheckpointTxId(NULL_CONTROLLER,
+ GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId();
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
public CheckpointSignature rollEditLog() throws IOException {
try {
return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER,
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Jun 6 00:17:38 2012
@@ -254,11 +254,11 @@ public class PBHelper {
public static BlockWithLocationsProto convert(BlockWithLocations blk) {
return BlockWithLocationsProto.newBuilder()
.setBlock(convert(blk.getBlock()))
- .addAllDatanodeIDs(Arrays.asList(blk.getDatanodes())).build();
+ .addAllStorageIDs(Arrays.asList(blk.getStorageIDs())).build();
}
public static BlockWithLocations convert(BlockWithLocationsProto b) {
- return new BlockWithLocations(convert(b.getBlock()), b.getDatanodeIDsList()
+ return new BlockWithLocations(convert(b.getBlock()), b.getStorageIDsList()
.toArray(new String[0]));
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Jun 6 00:17:38 2012
@@ -205,6 +205,7 @@ public class Balancer {
private Map<Block, BalancerBlock> globalBlockList
= new HashMap<Block, BalancerBlock>();
private MovedBlocks movedBlocks = new MovedBlocks();
+ // Map storage IDs to BalancerDatanodes
private Map<String, BalancerDatanode> datanodes
= new HashMap<String, BalancerDatanode>();
@@ -262,9 +263,9 @@ public class Balancer {
if (LOG.isDebugEnabled()) {
LOG.debug("Decided to move block "+ block.getBlockId()
+" with a length of "+StringUtils.byteDesc(block.getNumBytes())
- + " bytes from " + source.getName()
- + " to " + target.getName()
- + " using proxy source " + proxySource.getName() );
+ + " bytes from " + source.getDisplayName()
+ + " to " + target.getDisplayName()
+ + " using proxy source " + proxySource.getDisplayName() );
}
return true;
}
@@ -317,15 +318,15 @@ public class Balancer {
receiveResponse(in);
bytesMoved.inc(block.getNumBytes());
LOG.info( "Moving block " + block.getBlock().getBlockId() +
- " from "+ source.getName() + " to " +
- target.getName() + " through " +
- proxySource.getName() +
+ " from "+ source.getDisplayName() + " to " +
+ target.getDisplayName() + " through " +
+ proxySource.getDisplayName() +
" is succeeded." );
} catch (IOException e) {
LOG.warn("Error moving block "+block.getBlockId()+
- " from " + source.getName() + " to " +
- target.getName() + " through " +
- proxySource.getName() +
+ " from " + source.getDisplayName() + " to " +
+ target.getDisplayName() + " through " +
+ proxySource.getDisplayName() +
": "+e.getMessage());
} finally {
IOUtils.closeStream(out);
@@ -378,7 +379,8 @@ public class Balancer {
public void run() {
if (LOG.isDebugEnabled()) {
LOG.debug("Starting moving "+ block.getBlockId() +
- " from " + proxySource.getName() + " to " + target.getName());
+ " from " + proxySource.getDisplayName() + " to " +
+ target.getDisplayName());
}
dispatch();
}
@@ -475,7 +477,7 @@ public class Balancer {
@Override
public String toString() {
- return getClass().getSimpleName() + "[" + getName()
+ return getClass().getSimpleName() + "[" + datanode
+ ", utilization=" + utilization + "]";
}
@@ -507,8 +509,8 @@ public class Balancer {
}
/** Get the name of the datanode */
- protected String getName() {
- return datanode.getName();
+ protected String getDisplayName() {
+ return datanode.toString();
}
/* Get the storage id of the datanode */
@@ -620,8 +622,8 @@ public class Balancer {
synchronized (block) {
// update locations
- for ( String location : blk.getDatanodes() ) {
- BalancerDatanode datanode = datanodes.get(location);
+ for ( String storageID : blk.getStorageIDs() ) {
+ BalancerDatanode datanode = datanodes.get(storageID);
if (datanode != null) { // not an unknown datanode
block.addLocation(datanode);
}
@@ -831,7 +833,7 @@ public class Balancer {
this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
} else {
assert(isOverUtilized(datanodeS)) :
- datanodeS.getName()+ "is not an overUtilized node";
+ datanodeS.getDisplayName()+ "is not an overUtilized node";
this.overUtilizedDatanodes.add((Source)datanodeS);
overLoadedBytes += (long)((datanodeS.utilization-avg
-threshold)*datanodeS.datanode.getCapacity()/100.0);
@@ -842,7 +844,7 @@ public class Balancer {
this.belowAvgUtilizedDatanodes.add(datanodeS);
} else {
assert isUnderUtilized(datanodeS) : "isUnderUtilized("
- + datanodeS.getName() + ")=" + isUnderUtilized(datanodeS)
+ + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS)
+ ", utilization=" + datanodeS.utilization;
this.underUtilizedDatanodes.add(datanodeS);
underLoadedBytes += (long)((avg-threshold-
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Wed Jun 6 00:17:38 2012
@@ -200,7 +200,7 @@ class NameNodeConnector {
Thread.sleep(keyUpdaterInterval);
}
} catch (InterruptedException e) {
- LOG.info("InterruptedException in block key updater thread", e);
+ LOG.debug("InterruptedException in block key updater thread", e);
} catch (Throwable e) {
LOG.error("Exception in block key updater thread", e);
shouldRun = false;
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Wed Jun 6 00:17:38 2012
@@ -19,9 +19,6 @@ package org.apache.hadoop.hdfs.server.bl
import java.io.IOException;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.fs.ContentSummary;
/**
@@ -31,19 +28,24 @@ import org.apache.hadoop.fs.ContentSumma
public interface BlockCollection {
/**
* Get the last block of the collection.
- * Make sure it has the right type.
*/
- public <T extends BlockInfo> T getLastBlock() throws IOException;
+ public BlockInfo getLastBlock() throws IOException;
/**
* Get content summary.
*/
public ContentSummary computeContentSummary();
- /** @return the number of blocks */
+ /**
+ * @return the number of blocks
+ */
public int numBlocks();
+ /**
+ * Get the blocks.
+ */
public BlockInfo[] getBlocks();
+
/**
* Get preferred block size for the collection
* @return preferred block size in bytes
@@ -57,7 +59,7 @@ public interface BlockCollection {
public short getReplication();
/**
- * Get name of collection.
+ * Get the name of the collection.
*/
public String getName();
}
Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1346682&r1=1346681&r2=1346682&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Jun 6 00:17:38 2012
@@ -437,7 +437,7 @@ public class BlockManager {
* @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes.
*/
- private boolean commitBlock(final BlockInfoUnderConstruction block,
+ private static boolean commitBlock(final BlockInfoUnderConstruction block,
final Block commitBlock) throws IOException {
if (block.getBlockUCState() == BlockUCState.COMMITTED)
return false;