You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/10/05 01:59:02 UTC
svn commit: r582029 - in /lucene/hadoop/branches/branch-0.14: ./
src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Author: dhruba
Date: Thu Oct 4 16:59:00 2007
New Revision: 582029
URL: http://svn.apache.org/viewvc?rev=582029&view=rev
Log:
HADOOP-1955. The Namenode tries to not pick the same source Datanode for
a replication request if the earlier replication request for the same
block and that source Datanode had failed.
(Raghu Angadi via dhruba)
This corresponds to changelist 582028 on trunk.
Modified:
lucene/hadoop/branches/branch-0.14/CHANGES.txt
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java
Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Thu Oct 4 16:59:00 2007
@@ -26,6 +26,11 @@
HADOOP-1978. Name-node removes edits.new after a successful startup.
(Konstantin Shvachko via dhruba)
+ HADOOP-1955. The Namenode tries to not pick the same source Datanode for
+ a replication request if the earlier replication request for the same
+ block and that source Datanode had failed.
+ (Raghu Angadi via dhruba)
+
Release 0.14.1 - 2007-09-04
BUG FIXES
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSDataset.java Thu Oct 4 16:59:00 2007
@@ -442,7 +442,9 @@
private HashMap<Block,FSVolume> volumeMap = null;
private HashMap<Block,File> blockMap = null;
static Random random = new Random();
-
+
+ long blockWriteTimeout = 3600 * 1000;
+
/**
* An FSDataset has a directory where it loads its data files.
*/
@@ -457,6 +459,8 @@
volumes.getVolumeMap(volumeMap);
blockMap = new HashMap<Block,File>();
volumes.getBlockMap(blockMap);
+ blockWriteTimeout = Math.max(
+ conf.getInt("dfs.datanode.block.write.timeout.sec", 3600), 1) * 1000;
}
/**
@@ -526,8 +530,9 @@
//
if (ongoingCreates.containsKey(b)) {
// check how old is the temp file - wait 1 hour
- File tmp = (File)ongoingCreates.get(b);
- if ((System.currentTimeMillis() - tmp.lastModified()) < 3600 * 1000) {
+ File tmp = ongoingCreates.get(b);
+ if ((System.currentTimeMillis() - tmp.lastModified()) <
+ blockWriteTimeout) {
throw new IOException("Block " + b +
" has already been started (though not completed), and thus cannot be created.");
} else {
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Oct 4 16:59:00 2007
@@ -183,7 +183,7 @@
private long replicationRecheckInterval;
//decommissionRecheckInterval is how often namenode checks if a node has finished decommission
private long decommissionRecheckInterval;
- static int replIndex = 0; // last datanode used for replication work
+ private int replIndex = 0; // last datanode used for replication work
static int REPL_WORK_PER_ITERATION = 32; // max percent datanodes per iteration
public static FSNamesystem fsNamesystemObject;
@@ -217,7 +217,9 @@
this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
this.safeMode = new SafeModeInfo(conf);
setBlockTotal();
- pendingReplications = new PendingReplicationBlocks(LOG);
+ pendingReplications = new PendingReplicationBlocks(LOG,
+ conf.getInt("dfs.replication.pending.timeout.sec",
+ -1) * 1000);
this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(new LeaseMonitor());
this.replthread = new Daemon(new ReplicationMonitor());
@@ -1886,6 +1888,7 @@
int numiter = 0;
int foundwork = 0;
int hsize = 0;
+ int lastReplIndex = -1;
while (true) {
DatanodeDescriptor node = null;
@@ -1897,6 +1900,11 @@
synchronized (heartbeats) {
hsize = heartbeats.size();
if (numiter++ >= hsize) {
+ // no change in replIndex.
+ if (lastReplIndex >= 0) {
+ //next time, start after where the last replication was scheduled
+ replIndex = lastReplIndex;
+ }
break;
}
if (replIndex >= hsize) {
@@ -1922,6 +1930,7 @@
doReplication = true;
addBlocksToBeReplicated(node, (Block[])replsets[0],
(DatanodeDescriptor[][])replsets[1]);
+ lastReplIndex = replIndex;
}
}
if (!doReplication) {
Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Thu Oct 4 16:59:00 2007
@@ -49,12 +49,14 @@
private long defaultRecheckInterval = 5 * 60 * 1000;
PendingReplicationBlocks(long timeoutPeriod) {
- this.timeout = timeoutPeriod;
- init();
+ this(null, timeoutPeriod);
}
- PendingReplicationBlocks(Log log) {
+ PendingReplicationBlocks(Log log, long timeoutPeriod) {
this.LOG = log;
+ if ( timeoutPeriod > 0 ) {
+ this.timeout = timeoutPeriod;
+ }
init();
}
Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu Oct 4 16:59:00 2007
@@ -113,7 +113,9 @@
conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
new File(base_dir, "name2").getPath());
}
- conf.setInt("dfs.replication", Math.min(3, numDataNodes));
+
+ int replication = conf.getInt("dfs.replication", 3);
+ conf.setInt("dfs.replication", Math.min(replication, numDataNodes));
conf.setInt("dfs.safemode.extension", 0);
conf.setInt("dfs.namenode.decommission.interval", 3 * 1000); // 3 second
Modified: lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java?rev=582029&r1=582028&r2=582029&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/test/org/apache/hadoop/dfs/TestReplication.java Thu Oct 4 16:59:00 2007
@@ -19,6 +19,7 @@
import junit.framework.TestCase;
import java.io.*;
+import java.util.Iterator;
import java.util.Random;
import java.net.*;
@@ -166,4 +167,152 @@
cluster.shutdown();
}
}
+
+ // Waits for all of the blocks to have expected replication
+ private void waitForBlockReplication(String filename,
+ ClientProtocol namenode,
+ int expected, long maxWaitSec)
+ throws IOException {
+ long start = System.currentTimeMillis();
+
+ //wait for all the blocks to be replicated;
+ System.out.println("Checking for block replication for " + filename);
+ int iters = 0;
+ while (true) {
+ boolean replOk = true;
+ LocatedBlocks blocks = namenode.getBlockLocations(filename, 0,
+ Long.MAX_VALUE);
+
+ for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
+ iter.hasNext();) {
+ LocatedBlock block = iter.next();
+ int actual = block.getLocations().length;
+ if ( actual < expected ) {
+ if (true || iters > 0) {
+ System.out.println("Not enough replicas for " + block.getBlock() +
+ " yet. Expecting " + expected + ", got " +
+ actual + ".");
+ }
+ replOk = false;
+ break;
+ }
+ }
+
+ if (replOk) {
+ return;
+ }
+
+ iters++;
+
+ if (maxWaitSec > 0 &&
+ (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
+ throw new IOException("Timedout while waiting for all blocks to " +
+ " be replicated for " + filename);
+ }
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ignored) {}
+ }
+ }
+
+ /* This test makes sure that NameNode retries all the available blocks
+ * for under replicated blocks.
+ *
+ * It creates a file with one block and replication of 4. It corrupts
+ * two of the blocks and removes one of the replicas. Expected behaviour is
+ * that missing replica will be copied from one valid source.
+ */
+ public void testPendingReplicationRetry() throws IOException {
+
+ MiniDFSCluster cluster = null;
+ int numDataNodes = 4;
+ String testFile = "/replication-test-file";
+ Path testPath = new Path(testFile);
+
+ byte buffer[] = new byte[1024];
+ for (int i=0; i<buffer.length; i++) {
+ buffer[i] = '1';
+ }
+
+ try {
+ Configuration conf = new Configuration();
+ conf.set("dfs.replication", Integer.toString(numDataNodes));
+ //first time format
+ cluster = new MiniDFSCluster(0, conf, numDataNodes, true,
+ true, null, null);
+ cluster.waitActive();
+ DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
+ cluster.getNameNodePort()),
+ conf);
+
+ OutputStream out = cluster.getFileSystem().create(testPath);
+ out.write(buffer);
+ out.close();
+
+ waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
+
+ // get first block of the file.
+ String block = dfsClient.namenode.
+ getBlockLocations(testFile, 0, Long.MAX_VALUE).
+ get(0).getBlock().toString();
+
+ cluster.shutdown();
+ cluster = null;
+
+ //Now mess up some of the replicas.
+ //Delete the first and corrupt the next two.
+ File baseDir = new File(System.getProperty("test.build.data"),
+ "dfs/data");
+ for (int i=0; i<25; i++) {
+ buffer[i] = '0';
+ }
+
+ int fileCount = 0;
+ for (int i=0; i<6; i++) {
+ File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block);
+ System.out.println("Checking for file " + blockFile);
+
+ if (blockFile.exists()) {
+ if (fileCount == 0) {
+ assertTrue(blockFile.delete());
+ } else {
+ // corrupt it.
+ long len = blockFile.length();
+ assertTrue(len > 50);
+ RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw");
+ blockOut.seek(len/3);
+ blockOut.write(buffer, 0, 25);
+ }
+ fileCount++;
+ }
+ }
+ assertEquals(3, fileCount);
+
+ /* Start the MiniDFSCluster with more datanodes since once a writeBlock
+ * to a datanode node fails, same block can not be written to it
+ * immediately. In our case some replication attempts will fail.
+ */
+ conf = new Configuration();
+ conf.set("dfs.replication", Integer.toString(numDataNodes));
+ conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2));
+ conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
+ conf.set("dfs.safemode.threshold.pct", "0.75f"); // only 3 copies exist
+
+ cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false,
+ true, null, null);
+ cluster.waitActive();
+
+ dfsClient = new DFSClient(new InetSocketAddress("localhost",
+ cluster.getNameNodePort()),
+ conf);
+
+ waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
+
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}