You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/05 19:04:24 UTC
svn commit: r400112 - in /lucene/hadoop/trunk: ./ bin/ conf/
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/dfs/
Author: cutting
Date: Fri May 5 10:04:21 2006
New Revision: 400112
URL: http://svn.apache.org/viewcvs?rev=400112&view=rev
Log:
HADOOP-96. Logging improvements. Contributed by Hairong Kuang.
Added:
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/bin/hadoop
lucene/hadoop/trunk/bin/hadoop-daemon.sh
lucene/hadoop/trunk/bin/hadoop-daemons.sh
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri May 5 10:04:21 2006
@@ -188,6 +188,12 @@
status of reduce tasks or completed jobs. Also fixes the progress
meter so that failed tasks are subtracted. (omalley via cutting)
+49. HADOOP-96. Logging improvements. Log files are now separate from
+ standard output and standard error files. Logs are now rolled.
+ Logging of all DFS state changes can be enabled, to facilitate
+ debugging. (Hairong Kuang via cutting)
+
+
Release 0.1.1 - 2006-04-08
1. Added CHANGES.txt, logging all significant changes to Hadoop. (cutting)
Modified: lucene/hadoop/trunk/bin/hadoop
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/bin/hadoop?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop (original)
+++ lucene/hadoop/trunk/bin/hadoop Fri May 5 10:04:21 2006
@@ -51,7 +51,7 @@
# some directories
THIS_DIR=`dirname "$THIS"`
-HADOOP_HOME=`cd "$THIS_DIR/.." ; pwd`
+export HADOOP_HOME=`cd "$THIS_DIR/.." ; pwd`
# Allow alternate conf dir location.
HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_HOME/conf}"
@@ -143,6 +143,8 @@
# cygwin path translation
if expr `uname` : 'CYGWIN*' > /dev/null; then
CLASSPATH=`cygpath -p -w "$CLASSPATH"`
+ HADOOP_HOME=`cygpath -p -w "$HADOOP_HOME"`
+ HADOOP_LOG_DIR=`cygpath -p -w "$HADOOP_LOG_DIR"`
fi
# run it
Modified: lucene/hadoop/trunk/bin/hadoop-daemon.sh
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/bin/hadoop-daemon.sh?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop-daemon.sh (original)
+++ lucene/hadoop/trunk/bin/hadoop-daemon.sh Fri May 5 10:04:21 2006
@@ -38,7 +38,7 @@
done
# the root of the Hadoop installation
-HADOOP_HOME=`dirname "$this"`/..
+export HADOOP_HOME=`dirname "$this"`/..
# Allow alternate conf dir location.
HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_HOME/conf}"
@@ -49,9 +49,9 @@
# get log directory
if [ "$HADOOP_LOG_DIR" = "" ]; then
- HADOOP_LOG_DIR="$HADOOP_HOME/logs"
- mkdir -p "$HADOOP_LOG_DIR"
+ export HADOOP_LOG_DIR="$HADOOP_HOME/logs"
fi
+mkdir -p "$HADOOP_LOG_DIR"
if [ "$HADOOP_PID_DIR" = "" ]; then
HADOOP_PID_DIR=/tmp
@@ -62,7 +62,7 @@
fi
# some variables
-log=$HADOOP_LOG_DIR/hadoop-$HADOOP_IDENT_STRING-$command-`hostname`.log
+log=$HADOOP_LOG_DIR/hadoop-$HADOOP_IDENT_STRING-$command-`hostname`.out
pid=$HADOOP_PID_DIR/hadoop-$HADOOP_IDENT_STRING-$command.pid
case $startStop in
@@ -81,9 +81,8 @@
rsync -a -e ssh --delete --exclude=.svn $HADOOP_MASTER/ "$HADOOP_HOME"
fi
- cd "$HADOOP_HOME"
echo starting $command, logging to $log
- nohup bin/hadoop $command "$@" >& "$log" < /dev/null &
+ nohup $HADOOP_HOME/bin/hadoop $command "$@" >& "$log" < /dev/null &
echo $! > $pid
sleep 1; head "$log"
;;
Modified: lucene/hadoop/trunk/bin/hadoop-daemons.sh
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/bin/hadoop-daemons.sh?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop-daemons.sh (original)
+++ lucene/hadoop/trunk/bin/hadoop-daemons.sh Fri May 5 10:04:21 2006
@@ -13,4 +13,6 @@
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
-exec "$bin/slaves.sh" "$bin/hadoop-daemon.sh" "$@"
+HADOOP_HOME="$bin/.."
+
+exec "$bin/slaves.sh" cd "$HADOOP_HOME" \; "$bin/hadoop-daemon.sh" "$@"
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri May 5 10:04:21 2006
@@ -7,6 +7,28 @@
<configuration>
+<!--- logging properties -->
+
+<property>
+ <name>hadoop.logfile.size</name>
+ <value>10000000</value>
+ <description>The max size of each log file</description>
+</property>
+
+<property>
+ <name>hadoop.logfile.count</name>
+ <value>10</value>
+ <description>The max number of log files</description>
+</property>
+
+<property>
+ <name>dfs.namenode.logging.level</name>
+ <value>info</value>
+ <description>The logging level for dfs namenode. Other values are "dir"(trac
+e namespace mutations), "block"(trace block under/over replications and block
+creations/deletions), or "all".</description>
+</property>
+
<!-- i/o properties -->
<property>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri May 5 10:04:21 2006
@@ -824,7 +824,9 @@
/**
*/
public static void main(String args[]) throws IOException {
+ Configuration conf = new Configuration();
LogFormatter.setShowThreadIDs(true);
- runAndWait(new Configuration());
+ LogFormatter.initFileHandler(conf, "datanode");
+ runAndWait(conf);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Fri May 5 10:04:21 2006
@@ -622,8 +622,12 @@
String pathString = path.toString();
mkdirs(new Path(pathString).getParent().toString());
INode newNode = new INode( new File(pathString).getName(), blocks, replication);
- if( ! unprotectedAddFile(path, newNode) )
- return false;
+ if( ! unprotectedAddFile(path, newNode) ) {
+ NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
+ +"failed to add "+path+" with "
+ +blocks.length+" blocks to the file system" );
+ return false;
+ }
// add create file record to log
UTF8 nameReplicationPair[] = new UTF8[] {
path,
@@ -631,6 +635,8 @@
logEdit(OP_ADD,
new ArrayWritable( UTF8.class, nameReplicationPair ),
new ArrayWritable( Block.class, newNode.blocks ));
+ NameNode.stateChangeLog.fine("DIR* FSDirectory.addFile: "
+ +path+" with "+blocks.length+" blocks is added to the file system" );
return true;
}
@@ -658,6 +664,8 @@
* Change the filename
*/
public boolean renameTo(UTF8 src, UTF8 dst) {
+ NameNode.stateChangeLog.fine("DIR* FSDirectory.renameTo: "
+ +src+" to "+dst );
waitForReady();
if (unprotectedRenameTo(src, dst)) {
logEdit(OP_RENAME, src, dst);
@@ -675,6 +683,8 @@
String dstStr = dst.toString();
INode renamedNode = rootDir.getNode(srcStr);
if (renamedNode == null) {
+ NameNode.stateChangeLog.warning("DIR* FSDirectory.unprotectedRenameTo: "
+ +"failed to rename "+src+" to "+dst+ " because "+ src+" does not exist" );
return false;
}
renamedNode.removeNode();
@@ -683,9 +693,13 @@
}
// the renamed node can be reused now
if( rootDir.addNode(dstStr, renamedNode ) == null ) {
+ NameNode.stateChangeLog.warning("DIR* FSDirectory.unprotectedRenameTo: "
+ +"failed to rename "+src+" to "+dst );
rootDir.addNode(srcStr, renamedNode); // put it back
return false;
}
+ NameNode.stateChangeLog.fine("DIR* FSDirectory.unprotectedRenameTo: "
+ +src+" is renamed to "+dst );
return true;
}
}
@@ -738,6 +752,8 @@
* Remove the file from management, return blocks
*/
public Block[] delete(UTF8 src) {
+ NameNode.stateChangeLog.fine("DIR* FSDirectory.delete: "
+ +src );
waitForReady();
Block[] blocks = unprotectedDelete(src);
if( blocks != null )
@@ -751,6 +767,8 @@
synchronized (rootDir) {
INode targetNode = rootDir.getNode(src.toString());
if (targetNode == null) {
+ NameNode.stateChangeLog.warning("DIR* FSDirectory.unprotectedDelete: "
+ +"failed to remove "+src+" because it does not exist" );
return null;
} else {
//
@@ -758,8 +776,12 @@
// the blocks underneath the node.
//
if (! targetNode.removeNode()) {
+ NameNode.stateChangeLog.warning("DIR* FSDirectory.unprotectedDelete: "
+ +"failed to remove "+src+" because it does not have a parent" );
return null;
} else {
+ NameNode.stateChangeLog.fine("DIR* FSDirectory.unprotectedDelete: "
+ +src+" is removed" );
Vector v = new Vector();
targetNode.collectSubtreeBlocks(v);
for (Iterator it = v.iterator(); it.hasNext(); ) {
@@ -905,12 +927,17 @@
String cur = (String) v.elementAt(i);
INode inserted = unprotectedMkdir(cur);
if (inserted != null) {
+ NameNode.stateChangeLog.fine("DIR* FSDirectory.mkdirs: "
+ +"created directory "+cur );
logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null);
lastSuccess = true;
} else {
lastSuccess = false;
}
}
+ if( !lastSuccess )
+ NameNode.stateChangeLog.warning("DIR* FSDirectory.mkdirs: "
+ +"failed to create directory "+src );
return lastSuccess;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri May 5 10:04:21 2006
@@ -284,7 +284,7 @@
short replication,
UTF8 clientName
) throws IOException {
- String text = "File " + src
+ String text = "file " + src
+ ((clientName != null) ? " on client " + clientName : "")
+ ".\n"
+ "Requested replication " + replication;
@@ -314,31 +314,38 @@
boolean overwrite,
short replication
) throws IOException {
+ NameNode.stateChangeLog.fine("DIR* NameSystem.startFile: file "
+ +src+" for "+holder+" at "+clientMachine);
try {
if (pendingCreates.get(src) != null) {
- String msg = "Cannot create file " + src + " for " + holder +
- " on " + clientMachine +
- " because pendingCreates is non-null.";
- throw new NameNode.AlreadyBeingCreatedException(msg);
+ throw new NameNode.AlreadyBeingCreatedException(
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because pendingCreates is non-null.");
}
- verifyReplication(src.toString(), replication, clientMachine );
-
+ try {
+ verifyReplication(src.toString(), replication, clientMachine );
+ } catch( IOException e) {
+ throw new IOException( "failed to create "+e.getMessage());
+ }
if (!dir.isValidToCreate(src)) {
if (overwrite) {
delete(src);
} else {
- throw new IOException("Can't create file " + src +
- ", because the filename is invalid.");
+ throw new IOException("failed to create file " + src
+ +" on client " + clientMachine
+ +" either because the filename is invalid or the file exists");
}
}
// Get the array of replication targets
DatanodeInfo targets[] = chooseTargets(replication, null, clientMachine);
if (targets.length < this.minReplication) {
- throw new IOException("Target-length is " + targets.length +
- ", below MIN_REPLICATION (" +
- minReplication+ ")");
+ throw new IOException("failed to create file "+src
+ +" on client " + clientMachine
+ +" because target-length is " + targets.length
+ +", below MIN_REPLICATION (" + minReplication+ ")");
}
// Reserve space for this pending file
@@ -346,7 +353,8 @@
new FileUnderConstruction(replication,
holder,
clientMachine));
- LOG.fine("Adding " + src + " to pendingCreates for " + holder);
+ NameNode.stateChangeLog.finer( "DIR* NameSystem.startFile: "
+ +"add "+src+" to pendingCreates for "+holder );
synchronized (leases) {
Lease lease = (Lease) leases.get(holder);
if (lease == null) {
@@ -367,7 +375,8 @@
results[1] = targets;
return results;
} catch (IOException ie) {
- LOG.warning(ie.getMessage());
+ NameNode.stateChangeLog.warning("DIR* NameSystem.startFile: "
+ +ie.getMessage());
throw ie;
}
}
@@ -386,6 +395,8 @@
public synchronized Object[] getAdditionalBlock(UTF8 src,
UTF8 clientName
) throws IOException {
+ NameNode.stateChangeLog.fine("BLOCK* NameSystem.getAdditionalBlock: file "
+ +src+" for "+clientName);
FileUnderConstruction pendingFile =
(FileUnderConstruction) pendingCreates.get(src);
// make sure that we still have the lease on this file
@@ -428,6 +439,8 @@
//
// Remove the block from the pending creates list
//
+ NameNode.stateChangeLog.fine("BLOCK* NameSystem.abandonBlock: "
+ +b.getBlockName()+"of file "+src );
FileUnderConstruction pendingFile =
(FileUnderConstruction) pendingCreates.get(src);
if (pendingFile != null) {
@@ -437,6 +450,10 @@
if (cur.compareTo(b) == 0) {
pendingCreateBlocks.remove(cur);
it.remove();
+ NameNode.stateChangeLog.finer(
+ "BLOCK* NameSystem.abandonBlock: "
+ +b.getBlockName()
+ +" is removed from pendingCreateBlock and pendingCreates");
return true;
}
}
@@ -450,7 +467,7 @@
public synchronized void abandonFileInProgress(UTF8 src,
UTF8 holder
) throws IOException {
- LOG.info("abandoning file in progress on " + src.toString());
+ NameNode.stateChangeLog.fine("DIR* NameSystem.abandonFileInProgress:" + src );
synchronized (leases) {
// find the lease
Lease lease = (Lease) leases.get(holder);
@@ -478,10 +495,12 @@
* been reported by datanodes and are replicated correctly.
*/
public synchronized int completeFile(UTF8 src, UTF8 holder) {
+ NameNode.stateChangeLog.fine("DIR* NameSystem.completeFile: " + src + " for " + holder );
if (dir.getFile(src) != null || pendingCreates.get(src) == null) {
- LOG.info( "Failed to complete " + src +
- " because dir.getFile()==" + dir.getFile(src) +
- " and " + pendingCreates.get(src));
+ NameNode.stateChangeLog.warning( "DIR* NameSystem.completeFile: "
+ + "failed to complete " + src
+ + " because dir.getFile()==" + dir.getFile(src)
+ + " and " + pendingCreates.get(src));
return OPERATION_FAILED;
} else if (! checkFileProgress(src)) {
return STILL_WAITING;
@@ -519,14 +538,14 @@
// Now we can add the (name,blocks) tuple to the filesystem
//
if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
- System.out.println("AddFile() for " + src + " failed");
return OPERATION_FAILED;
}
// The file is no longer pending
pendingCreates.remove(src);
- LOG.fine("Removing " + src + " from pendingCreates for " + holder +
- ". (complete)");
+ NameNode.stateChangeLog.finer(
+ "DIR* NameSystem.completeFile: " + src
+ + " is removed from pendingCreates");
for (int i = 0; i < nrBlocks; i++) {
pendingCreateBlocks.remove(pendingBlocks[i]);
}
@@ -554,13 +573,11 @@
for (int i = 0; i < nrBlocks; i++) {
TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]);
if (containingNodes.size() < pendingFile.getReplication()) {
+ NameNode.stateChangeLog.finer(
+ "DIR* NameSystem.completeFile:"
+ + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size()
+ +" replicas so is added to neededReplications");
synchronized (neededReplications) {
- LOG.info("Completed file " + src
- + ", at holder " + holder
- + ". There is/are only " + containingNodes.size()
- + " copies of block " + pendingBlocks[i]
- + ", so replicating up to "
- + pendingFile.getReplication());
neededReplications.add(pendingBlocks[i]);
}
}
@@ -577,6 +594,9 @@
(FileUnderConstruction) pendingCreates.get(src);
v.getBlocks().add(b);
pendingCreateBlocks.add(b);
+ NameNode.stateChangeLog.finer("BLOCK* NameSystem.allocateBlock: "
+ +src+ ". "+b.getBlockName()+
+ " is created and added to pendingCreates and pendingCreateBlocks" );
return b;
}
@@ -613,6 +633,7 @@
* Change the indicated filename.
*/
public boolean renameTo(UTF8 src, UTF8 dst) {
+ NameNode.stateChangeLog.fine("DIR* NameSystem.renameTo: " + src + " to " + dst );
return dir.renameTo(src, dst);
}
@@ -621,6 +642,7 @@
* invalidate some blocks that make up the file.
*/
public synchronized boolean delete(UTF8 src) {
+ NameNode.stateChangeLog.fine("DIR* NameSystem.delete: " + src );
Block deletedBlocks[] = (Block[]) dir.delete(src);
if (deletedBlocks != null) {
for (int i = 0; i < deletedBlocks.length; i++) {
@@ -636,6 +658,8 @@
recentInvalidateSets.put(node.getName(), invalidateSet);
}
invalidateSet.add(b);
+ NameNode.stateChangeLog.finer("BLOCK* NameSystem.delete: "
+ + b.getBlockName() + " is added to invalidSet of " + node.getName() );
}
}
}
@@ -666,6 +690,7 @@
* Create all the necessary directories
*/
public boolean mkdirs(UTF8 src) {
+ NameNode.stateChangeLog.fine("DIR* NameSystem.mkdirs: " + src );
return dir.mkdirs(src);
}
@@ -897,15 +922,18 @@
FileUnderConstruction v =
(FileUnderConstruction) pendingCreates.remove(src);
if (v != null) {
- LOG.info("Removing " + src + " from pendingCreates for " +
- holder + " (failure)");
+ NameNode.stateChangeLog.finer(
+ "DIR* NameSystem.internalReleaseCreate: " + src
+ + " is removed from pendingCreates for "
+ + holder + " (failure)");
for (Iterator it2 = v.getBlocks().iterator(); it2.hasNext(); ) {
Block b = (Block) it2.next();
pendingCreateBlocks.remove(b);
}
} else {
- LOG.info("Attempt to release a create lock on " + src.toString()
- + " that was not in pendingCreates");
+ NameNode.stateChangeLog.warning("DIR* NameSystem.internalReleaseCreate: "
+ + "attempt to release a create lock on "+ src.toString()
+ + " that was not in pedingCreates");
}
}
@@ -949,8 +977,9 @@
DatanodeInfo nodeinfo = (DatanodeInfo) datanodeMap.get(name);
if (nodeinfo == null) {
- LOG.info("Got brand-new heartbeat from " + name);
- nodeinfo = new DatanodeInfo(name, capacity, remaining);
+ NameNode.stateChangeLog.fine("BLOCK* NameSystem.gotHeartbeat: "
+ +"brand-new heartbeat from "+name );
+ nodeinfo = new DatanodeInfo(name, capacity, remaining);
datanodeMap.put(name, nodeinfo);
capacityDiff = capacity;
remainingDiff = remaining;
@@ -995,11 +1024,13 @@
while ((heartbeats.size() > 0) &&
((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) &&
(nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) {
- LOG.info("Lost heartbeat for " + nodeInfo.getName());
-
heartbeats.remove(nodeInfo);
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
+ + "lost heartbeat from " + nodeInfo.getName());
synchronized (datanodeMap) {
datanodeMap.remove(nodeInfo.getName());
+ NameNode.stateChangeLog.finer("BLOCK* NameSystem.heartbeatCheck: "
+ + nodeInfo.getName() + " is removed from datanodeMap");
}
totalCapacity -= nodeInfo.getCapacity();
totalRemaining -= nodeInfo.getRemaining();
@@ -1023,8 +1054,12 @@
* update the (machine-->blocklist) and (block-->machinelist) tables.
*/
public synchronized Block[] processReport(Block newReport[], UTF8 name) {
+ NameNode.stateChangeLog.fine("BLOCK* NameSystem.processReport: "
+ +"from "+name+" "+newReport.length+" blocks" );
DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
if (node == null) {
+ NameNode.stateChangeLog.severe("BLOCK* NameSystem.processReport: "
+ +"from "+name+" but can not find its info" );
throw new IllegalArgumentException("Unexpected exception. Received block report from node " + name + ", but there is no info for " + name);
}
@@ -1084,8 +1119,9 @@
Block b = (Block) it.next();
if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
- LOG.info("Obsoleting block " + b);
obsolete.add(b);
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+ +"ask "+name+" to delete "+b.getBlockName() );
}
}
return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
@@ -1103,8 +1139,19 @@
}
if (! containingNodes.contains(node)) {
containingNodes.add(node);
+ //
+ // Hairong: I would prefer to set the level of next logrecord
+ // to be finer.
+ // But at startup time, because too many new blocks come in
+ // they simply take up all the space in the log file
+ // So I set the level to be finest
+ //
+ NameNode.stateChangeLog.finest("BLOCK* NameSystem.addStoredBlock: "
+ +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName() );
} else {
- LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node);
+ NameNode.stateChangeLog.warning("BLOCK* NameSystem.addStoredBlock: "
+ + "Redundant addStoredBlock request received for "
+ + block.getBlockName() + " on " + node.getName());
}
synchronized (neededReplications) {
@@ -1115,8 +1162,15 @@
if (containingNodes.size() >= fileReplication ) {
neededReplications.remove(block);
pendingReplications.remove(block);
- } else // containingNodes.size() < fileReplication
+ NameNode.stateChangeLog.finest("BLOCK* NameSystem.addStoredBlock: "
+ +block.getBlockName()+" has "+containingNodes.size()
+ +" replicas so is removed from neededReplications and pendingReplications" );
+ } else {// containingNodes.size() < fileReplication
neededReplications.add(block);
+ NameNode.stateChangeLog.finer("BLOCK* NameSystem.addStoredBlock: "
+ +block.getBlockName()+" has only "+containingNodes.size()
+ +" replicas so is added to neededReplications" );
+ }
proccessOverReplicatedBlock( block, fileReplication );
}
@@ -1161,6 +1215,8 @@
excessReplicateMap.put(cur.getName(), excessBlocks);
}
excessBlocks.add(b);
+ NameNode.stateChangeLog.finer("BLOCK* NameSystem.chooseExcessReplicates: "
+ +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap" );
//
// The 'excessblocks' tracks blocks until we get confirmation
@@ -1177,6 +1233,8 @@
recentInvalidateSets.put(cur.getName(), invalidateSet);
}
invalidateSet.add(b);
+ NameNode.stateChangeLog.finer("BLOCK* NameSystem.chooseExcessReplicates: "
+ +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets" );
}
}
@@ -1185,12 +1243,13 @@
* replication tasks, if the removed block is still valid.
*/
synchronized void removeStoredBlock(Block block, DatanodeInfo node) {
+ NameNode.stateChangeLog.fine("BLOCK* NameSystem.removeStoredBlock: "
+ +block.getBlockName() + " from "+node.getName() );
TreeSet containingNodes = (TreeSet) blocksMap.get(block);
if (containingNodes == null || ! containingNodes.contains(node)) {
throw new IllegalArgumentException("No machine mapping found for block " + block + ", which should be at node " + node);
}
containingNodes.remove(node);
-
//
// It's possible that the block was removed because of a datanode
// failure. If the block is still valid, check if replication is
@@ -1202,6 +1261,9 @@
synchronized (neededReplications) {
neededReplications.add(block);
}
+ NameNode.stateChangeLog.finer("BLOCK* NameSystem.removeStoredBlock: "
+ +block.getBlockName()+" has only "+containingNodes.size()
+ +" replicas so is added to neededReplications" );
}
//
@@ -1211,6 +1273,8 @@
TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getName());
if (excessBlocks != null) {
excessBlocks.remove(block);
+ NameNode.stateChangeLog.finer("BLOCK* NameSystem.removeStoredBlock: "
+ +block.getBlockName()+" is removed from excessBlocks" );
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getName());
}
@@ -1223,8 +1287,12 @@
public synchronized void blockReceived(Block block, UTF8 name) {
DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
if (node == null) {
+ NameNode.stateChangeLog.warning("BLOCK* NameSystem.blockReceived: "
+ +block.getBlockName()+" is received from an unrecorded node " + name );
throw new IllegalArgumentException("Unexpected exception. Got blockReceived message from node " + name + ", but there is no info for " + name);
}
+ NameNode.stateChangeLog.fine("BLOCK* NameSystem.blockReceived: "
+ +block.getBlockName()+" is received from " + name );
//
// Modify the blocks->datanode map
//
@@ -1279,11 +1347,20 @@
*/
public synchronized Block[] blocksToInvalidate(UTF8 sender) {
Vector invalidateSet = (Vector) recentInvalidateSets.remove(sender);
- if (invalidateSet != null) {
- return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
- } else {
+
+ if (invalidateSet == null )
return null;
+
+ if(NameNode.stateChangeLog.isLoggable(Level.INFO)) {
+ StringBuffer blockList = new StringBuffer();
+ for( int i=0; i<invalidateSet.size(); i++ ) {
+ blockList.append(' ');
+ blockList.append(((Block)invalidateSet.elementAt(i)).getBlockName());
+ }
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.blockToInvalidate: "
+ +"ask "+sender+" to delete " + blockList );
}
+ return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
}
/**
@@ -1299,7 +1376,7 @@
public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int xmitsInProgress) {
synchronized (neededReplications) {
Object results[] = null;
- int scheduledXfers = 0;
+ int scheduledXfers = 0;
if (neededReplications.size() > 0) {
//
@@ -1334,7 +1411,7 @@
// Build items to return
replicateBlocks.add(block);
replicateTargetSets.add(targets);
- scheduledXfers += targets.length;
+ scheduledXfers += targets.length;
}
}
}
@@ -1356,14 +1433,22 @@
if (containingNodes.size() + targets.length >= dir.getFileByBlock(block).getReplication()) {
neededReplications.remove(block);
pendingReplications.add(block);
+ NameNode.stateChangeLog.finer("BLOCK* NameSystem.pendingTransfer: "
+ +block.getBlockName()
+ +" is removed from neededReplications to pendingReplications" );
}
- LOG.info("Pending transfer (block "
- + block.getBlockName()
- + ") from " + srcNode.getName()
- + " to " + targets[0].getName()
- + (targets.length > 1 ? " and " + (targets.length-1)
- + " more destination(s)" : "" ));
+ if(NameNode.stateChangeLog.isLoggable(Level.INFO)) {
+ StringBuffer targetList = new StringBuffer( "datanode(s)");
+ for(int k=0; k<targets.length; k++) {
+ targetList.append(' ');
+ targetList.append(targets[k].getName());
+ }
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.pendingTransfer: "
+ +"ask "+srcNode.getName()
+ +" to replicate "+block.getBlockName()
+ +" to "+targetList);
+ }
}
//
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Fri May 5 10:04:21 2006
@@ -57,6 +57,7 @@
**********************************************************/
public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.dfs.NameNode");
+ public static final Logger stateChangeLog = LogFormatter.getLogger( "org.apache.hadoop.dfs.StateChange");
private FSNamesystem namesystem;
private Server server;
@@ -182,7 +183,9 @@
boolean overwrite,
short replication
) throws IOException {
- Object results[] = namesystem.startFile(new UTF8(src),
+ stateChangeLog.fine("*DIR* NameNode.create: file "
+ +src+" for "+clientName+" at "+clientMachine);
+ Object results[] = namesystem.startFile(new UTF8(src),
new UTF8(clientName),
new UTF8(clientMachine),
overwrite,
@@ -202,6 +205,8 @@
*/
public LocatedBlock addBlock(String src,
String clientName) throws IOException {
+ stateChangeLog.fine("*BLOCK* NameNode.addBlock: file "
+ +src+" for "+clientName);
UTF8 src8 = new UTF8(src);
UTF8 client8 = new UTF8(clientName);
Object[] results = namesystem.getAdditionalBlock(src8, client8);
@@ -216,8 +221,12 @@
* to prevent weird heartbeat race conditions.
*/
public void reportWrittenBlock(LocatedBlock lb) throws IOException {
- Block b = lb.getBlock();
+ Block b = lb.getBlock();
DatanodeInfo targets[] = lb.getLocations();
+ stateChangeLog.fine("*BLOCK* NameNode.reportWrittenBlock"
+ +": " + b.getBlockName() +" is written to "
+ +targets.length + " locations" );
+
for (int i = 0; i < targets.length; i++) {
namesystem.blockReceived(b, targets[i].getName());
}
@@ -227,6 +236,8 @@
* The client needs to give up on the block.
*/
public void abandonBlock(Block b, String src) throws IOException {
+ stateChangeLog.fine("*BLOCK* NameNode.abandonBlock: "
+ +b.getBlockName()+" of file "+src );
if (! namesystem.abandonBlock(b, new UTF8(src))) {
throw new IOException("Cannot abandon block during write to " + src);
}
@@ -235,11 +246,13 @@
*/
public void abandonFileInProgress(String src,
String holder) throws IOException {
+ stateChangeLog.fine("*DIR* NameNode.abandonFileInProgress:" + src );
namesystem.abandonFileInProgress(new UTF8(src), new UTF8(holder));
}
/**
*/
public boolean complete(String src, String clientName) throws IOException {
+ stateChangeLog.fine("*DIR* NameNode.complete: " + src + " for " + clientName );
int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName));
if (returnCode == STILL_WAITING) {
return false;
@@ -269,12 +282,14 @@
/**
*/
public boolean rename(String src, String dst) throws IOException {
+ stateChangeLog.fine("*DIR* NameNode.rename: " + src + " to " + dst );
return namesystem.renameTo(new UTF8(src), new UTF8(dst));
}
/**
*/
public boolean delete(String src) throws IOException {
+ stateChangeLog.fine("*DIR* NameNode.delete: " + src );
return namesystem.delete(new UTF8(src));
}
@@ -293,6 +308,7 @@
/**
*/
public boolean mkdirs(String src) throws IOException {
+ stateChangeLog.fine("*DIR* NameNode.mkdirs: " + src );
return namesystem.mkdirs(new UTF8(src));
}
@@ -404,6 +420,8 @@
}
public Block[] blockReport(String sender, Block blocks[]) {
+ stateChangeLog.fine("*BLOCK* NameNode.blockReport: "
+ +"from "+sender+" "+blocks.length+" blocks" );
if( firstBlockReportTime==0)
firstBlockReportTime=System.currentTimeMillis();
@@ -411,6 +429,8 @@
}
public void blockReceived(String sender, Block blocks[]) {
+ stateChangeLog.fine("*BLOCK* NameNode.blockReceived: "
+ +"from "+sender+" "+blocks.length+" blocks." );
for (int i = 0; i < blocks.length; i++) {
namesystem.blockReceived(blocks[i], new UTF8(sender));
}
@@ -441,6 +461,19 @@
System.err.println("Formatted "+dir);
System.exit(0);
}
+
+ LogFormatter.initFileHandler( conf, "namenode" );
+ LogFormatter.setShowThreadIDs(true);
+ String confLevel = conf.get("dfs.namenode.logging.level", "info");
+ Level level;
+ if( confLevel.equals( "dir"))
+ level=Level.FINE;
+ else if( confLevel.equals( "block"))
+ level=Level.FINER;
+ else if( confLevel.equals( "all"))
+ level=Level.FINEST;
+ else level=Level.INFO;
+ stateChangeLog.setLevel( level);
NameNode namenode = new NameNode(conf);
namenode.join();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri May 5 10:04:21 2006
@@ -1015,6 +1015,8 @@
System.exit(-1);
}
- startTracker(new Configuration());
+ Configuration conf=new Configuration();
+ LogFormatter.initFileHandler( conf, "jobtracker" );
+ startTracker(conf);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri May 5 10:04:21 2006
@@ -839,7 +839,9 @@
System.exit(-1);
}
- TaskTracker tt = new TaskTracker(new JobConf());
+ JobConf conf=new JobConf();
+ LogFormatter.initFileHandler( conf, "tasktracker" );
+ TaskTracker tt = new TaskTracker(conf);
tt.run();
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java Fri May 5 10:04:21 2006
@@ -18,9 +18,12 @@
import java.util.logging.*;
import java.io.*;
+import java.net.InetAddress;
import java.text.*;
import java.util.Date;
+import org.apache.hadoop.conf.Configuration;
+
/** Prints just the date and the log message. */
public class LogFormatter extends Formatter {
@@ -34,7 +37,7 @@
private static boolean showTime = true;
private static boolean showThreadIDs = false;
-
+
// install when this class is loaded
static {
Handler[] handlers = LogFormatter.getLogger("").getHandlers();
@@ -44,6 +47,63 @@
}
}
+ public static String initFileHandler( Configuration conf, String opName )
+ throws IOException {
+ String logDir=System.getenv("HADOOP_LOG_DIR");
+ String userHome=System.getProperty("user.dir");
+ if( logDir==null ) {
+ logDir=System.getenv("HADOOP_HOME");
+ if(logDir==null) {
+ logDir=userHome;
+ } else {
+ logDir+=File.separator+"logs";
+ }
+ }
+
+ if(!logDir.equals(userHome)) {
+ File logDirFile = new File( logDir );
+ if(!logDirFile.exists()) {
+ if(!logDirFile.mkdirs()) {
+ logDir=userHome;
+ }
+ } else if( !logDirFile.isDirectory()) {
+ logDir=userHome;
+ }
+ }
+
+ String hostname;
+ try {
+ hostname=InetAddress.getLocalHost().getHostName();
+ int index=hostname.indexOf('.');
+ if( index != -1 ) {
+ hostname=hostname.substring(0, index);
+ }
+ } catch (java.net.UnknownHostException e) {
+ hostname="localhost";
+ }
+
+ String logFile = logDir+File.separator+"hadoop-"+System.getProperty( "user.name" )
+ +"-"+opName+"-"+hostname+".log";
+
+ int logFileSize = conf.getInt( "hadoop.logfile.size", 10000000 );
+ int logFileCount = conf.getInt( "hadoop.logfile.count", 10 );
+
+ FileHandler fh=new FileHandler(logFile, logFileSize, logFileCount, false);
+ fh.setFormatter(new LogFormatter());
+ fh.setLevel(Level.FINEST);
+
+ Logger rootLogger = LogFormatter.getLogger("");
+ rootLogger.info( "directing logs to directory "+logDir );
+
+ Handler[] handlers = rootLogger.getHandlers();
+ for( int i=0; i<handlers.length; i++ ) {
+ rootLogger.removeHandler( handlers[i]);
+ }
+ rootLogger.addHandler(fh);
+
+ return logFile;
+ }
+
/** Gets a logger and, as a side effect, installs this as the default
* formatter. */
public static Logger getLogger(String name) {
@@ -77,8 +137,11 @@
// the thread id
if (showThreadIDs) {
- buffer.append(" ");
- buffer.append(record.getThreadID());
+ buffer.append(" 0x");
+ String threadID = Integer.toHexString(record.getThreadID());
+ for (int i = 0; i < 8 - threadID.length(); i++)
+ buffer.append('0');
+ buffer.append(threadID);
}
// handle SEVERE specially
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java?rev=400112&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java Fri May 5 10:04:21 2006
@@ -0,0 +1,467 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import junit.framework.AssertionFailedError;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.util.LogFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.NameNode;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Test DFS logging
+ * make sure that any namespace mutations are logged.
+ * @author Hairong Kuang
+ */
+public class ClusterTestDFSNamespaceLogging extends TestCase implements FSConstants {
+ private static final Logger LOG =
+ LogFormatter.getLogger("org.apache.hadoop.dfs.ClusterTestDFS");
+
+ private static Configuration conf = new Configuration();
+
+ /**
+ * all DFS test files go under this base directory
+ */
+ private static String baseDirSpecified=conf.get("test.dfs.data", "/tmp/test-dfs");;
+
+ /**
+ * base dir as File
+ */
+ private static File baseDir=new File(baseDirSpecified);
+
+ /**
+ * name node port
+ */
+ int nameNodePort = conf.getInt("dfs.namenode.port", 9020);
+
+ /** DFS client, datanodes, and namenode
+ */
+ DFSClient dfsClient;
+ ArrayList dataNodeDaemons = new ArrayList();
+ NameNode nameNodeDaemon;
+
+ /** Log header length
+ */
+ private static final int DIR_LOG_HEADER_LEN = 30;
+ private static final int BLOCK_LOG_HEADER_LEN = 32;
+ /** DFS block size
+ */
+ private static final int BLOCK_SIZE = 32*1000*1000;
+
+ /** Buffer size
+ */
+ private static final int BUFFER_SIZE = 4096;
+
+ private BufferedReader logfh;
+ private String logFile;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ conf.setBoolean("test.dfs.same.host.targets.allowed", true);
+ }
+
+ /**
+ * Remove old files from temp area used by this test case and be sure
+ * base temp directory can be created.
+ */
+ protected void prepareTempFileSpace() {
+ if (baseDir.exists()) {
+ try { // start from a blank state
+ FileUtil.fullyDelete(baseDir);
+ } catch (Exception ignored) {
+ }
+ }
+ baseDir.mkdirs();
+ if (!baseDir.isDirectory()) {
+ throw new RuntimeException("Value of root directory property"
+ + "test.dfs.data for dfs test is not a directory: "
+ + baseDirSpecified);
+ }
+ }
+
+ /**
+ * Pseudo Distributed FS Test.
+ * Test DFS by running all the necessary daemons in one process.
+ *
+ * @throws Exception
+ */
+ public void testFsPseudoDistributed() throws Exception {
+ // test on a small cluster with 3 data nodes
+ testFsPseudoDistributed(3);
+ }
+
+ private void testFsPseudoDistributed( int datanodeNum ) throws Exception {
+ try {
+ prepareTempFileSpace();
+
+ configureDFS();
+ startDFS(datanodeNum);
+
+ if( logfh == null )
+ try {
+ logfh = new BufferedReader( new FileReader( logFile ) );
+ } catch (FileNotFoundException e) {
+ // TODO Auto-generated catch block
+ throw new AssertionFailedError("Log file does not exist: "+logFile);
+ }
+
+ // create a directory
+ try {
+ dfsClient.mkdirs( new UTF8( "/data") );
+ assertMkdirs( "/data", false );
+ } catch ( IOException ioe ) {
+ ioe.printStackTrace();
+ }
+
+ try {
+ dfsClient.mkdirs( new UTF8( "data") );
+ assertMkdirs( "data", true );
+ } catch ( IOException ioe ) {
+ ioe.printStackTrace();
+ }
+
+ //
+ // create a file with 1 data block
+ try {
+ createFile("/data/xx", 1);
+ assertCreate( "/data/xx", 1, false );
+ } catch( IOException ioe ) {
+ assertCreate( "/data/xx", 1, true );
+ }
+
+ // create a file with 2 data blocks
+ try {
+ createFile("/data/yy",BLOCK_SIZE+1);
+ assertCreate( "/data/yy", BLOCK_SIZE+1, false );
+ } catch( IOException ioe ) {
+ assertCreate( "/data/yy", BLOCK_SIZE+1, true );
+ }
+
+ // create an existing file
+ try {
+ createFile("/data/xx", 2);
+ assertCreate( "/data/xx", 2, false );
+ } catch( IOException ioe ) {
+ assertCreate( "/data/xx", 2, true );
+ }
+
+ // delete the file
+ try {
+ dfsClient.delete( new UTF8("/data/yy") );
+ assertDelete("/data/yy", false);
+ } catch( IOException ioe ) {
+ ioe.printStackTrace();
+ }
+
+
+ // rename the file
+ try {
+ dfsClient.rename( new UTF8("/data/xx"), new UTF8("/data/yy") );
+ assertRename( "/data/xx", "/data/yy", false );
+ } catch( IOException ioe ) {
+ ioe.printStackTrace();
+ }
+
+ try {
+ dfsClient.delete(new UTF8("/data/xx"));
+ assertDelete("/data/xx", true);
+ } catch(IOException ioe) {
+ ioe.printStackTrace();
+ }
+
+ try {
+ dfsClient.rename( new UTF8("/data/xx"), new UTF8("/data/yy") );
+ assertRename( "/data/xx", "/data/yy", true );
+ } catch( IOException ioe) {
+ ioe.printStackTrace();
+ }
+
+ } catch (AssertionFailedError afe) {
+ afe.printStackTrace();
+ throw afe;
+ } catch (Throwable t) {
+ msg("Unexpected exception_a: " + t);
+ t.printStackTrace();
+ } finally {
+ shutdownDFS();
+
+ }
+ }
+
+ private void createFile( String filename, long fileSize ) throws IOException {
+ //
+ // write filesize of data to file
+ //
+ byte[] buffer = new byte[BUFFER_SIZE];
+ UTF8 testFileName = new UTF8(filename); // hardcode filename
+ FSOutputStream nos;
+ nos = dfsClient.create(testFileName, false);
+ try {
+ for (long nBytesWritten = 0L;
+ nBytesWritten < fileSize;
+ nBytesWritten += buffer.length) {
+ if ((nBytesWritten + buffer.length) > fileSize) {
+ int pb = (int) (fileSize - nBytesWritten);
+ byte[] bufferPartial = new byte[pb];
+ for( int i=0; i<pb; i++) {
+ bufferPartial[i]='a';
+ }
+ nos.write(buffer);
+ } else {
+ for( int i=0; i<buffer.length;i++) {
+ buffer[i]='a';
+ }
+ nos.write(buffer);
+ }
+ }
+ } finally {
+ nos.flush();
+ nos.close();
+ }
+ }
+
+ private void assertMkdirs( String fileName, boolean failed ) {
+ assertHasLogged("NameNode.mkdirs: " +fileName, DIR_LOG_HEADER_LEN+1);
+ assertHasLogged("NameSystem.mkdirs: "+fileName, DIR_LOG_HEADER_LEN);
+ if( failed )
+ assertHasLogged("FSDirectory.mkdirs: "
+ +"failed to create directory "+fileName, DIR_LOG_HEADER_LEN);
+ else
+ assertHasLogged( "FSDirectory.mkdirs: created directory "+fileName, DIR_LOG_HEADER_LEN);
+ }
+
+ private void assertCreate( String fileName, int filesize, boolean failed ) {
+ assertHasLogged("NameNode.create: file "+fileName, DIR_LOG_HEADER_LEN+1);
+ assertHasLogged("NameSystem.startFile: file "+fileName, DIR_LOG_HEADER_LEN);
+ if( failed ) {
+ assertHasLogged("NameSystem.startFile: "
+ +"failed to create file " + fileName, DIR_LOG_HEADER_LEN);
+ } else {
+ assertHasLogged("NameSystem.allocateBlock: "+fileName, BLOCK_LOG_HEADER_LEN);
+ int blockNum = (filesize/BLOCK_SIZE*BLOCK_SIZE==filesize)?
+ filesize/BLOCK_SIZE : 1+filesize/BLOCK_SIZE;
+ for( int i=1; i<blockNum; i++) {
+ assertHasLogged("NameNode.addBlock: file "+fileName, BLOCK_LOG_HEADER_LEN+1);
+ assertHasLogged("NameSystem.getAdditionalBlock: file "+fileName, BLOCK_LOG_HEADER_LEN);
+ assertHasLogged("NameSystem.allocateBlock: "+fileName, BLOCK_LOG_HEADER_LEN);
+ }
+ assertHasLogged("NameNode.complete: "+fileName, DIR_LOG_HEADER_LEN+1);
+ assertHasLogged("NameSystem.completeFile: "+fileName, DIR_LOG_HEADER_LEN);
+ assertHasLogged("FSDirectory.addFile: "+fileName+" with "
+ +blockNum+" blocks is added to the file system", DIR_LOG_HEADER_LEN);
+ assertHasLogged("NameSystem.completeFile: "+fileName
+ +" is removed from pendingCreates", DIR_LOG_HEADER_LEN);
+ }
+ }
+
+ private void assertDelete( String fileName, boolean failed ) {
+ assertHasLogged("NameNode.delete: "+fileName, DIR_LOG_HEADER_LEN+1);
+ assertHasLogged("NameSystem.delete: "+fileName, DIR_LOG_HEADER_LEN);
+ assertHasLogged("FSDirectory.delete: "+fileName, DIR_LOG_HEADER_LEN);
+ if( failed )
+ assertHasLogged("FSDirectory.unprotectedDelete: "
+ +"failed to remove "+fileName, DIR_LOG_HEADER_LEN );
+ else
+ assertHasLogged("FSDirectory.unprotectedDelete: "
+ +fileName+" is removed", DIR_LOG_HEADER_LEN);
+ }
+
+ private void assertRename( String src, String dst, boolean failed ) {
+ assertHasLogged("NameNode.rename: "+src+" to "+dst, DIR_LOG_HEADER_LEN+1);
+ assertHasLogged("NameSystem.renameTo: "+src+" to "+dst, DIR_LOG_HEADER_LEN );
+ assertHasLogged("FSDirectory.renameTo: "+src+" to "+dst, DIR_LOG_HEADER_LEN );
+ if( failed )
+ assertHasLogged("FSDirectory.unprotectedRenameTo: "
+ +"failed to rename "+src+" to "+dst, DIR_LOG_HEADER_LEN);
+ else
+ assertHasLogged("FSDirectory.unprotectedRenameTo: "
+ +src+" is renamed to "+dst, DIR_LOG_HEADER_LEN );
+ }
+
+ private void assertHasLogged( String target, int headerLen ) {
+ String line;
+ boolean notFound = true;
+ try {
+ while( notFound && (line=logfh.readLine()) != null ) {
+ if(line.length()>headerLen && line.startsWith(target, headerLen))
+ notFound = false;
+ }
+ } catch(java.io.IOException e) {
+ throw new AssertionFailedError("error reading the log file");
+ }
+ if(notFound) {
+ throw new AssertionFailedError(target+" not logged");
+ }
+ }
+
+ //
+ // modify config for test
+ //
+ private void configureDFS() throws IOException {
+ // set given config param to override other config settings
+ conf.setInt("test.dfs.block_size", BLOCK_SIZE);
+ // verify that config changed
+ assertTrue(BLOCK_SIZE == conf.getInt("test.dfs.block_size", 2)); // 2 is an intentional obviously-wrong block size
+ // downsize for testing (just to save resources)
+ conf.setInt("dfs.namenode.handler.count", 3);
+ conf.setLong("dfs.blockreport.intervalMsec", 50*1000L);
+ conf.setLong("dfs.datanode.startupMsec", 15*1000L);
+ conf.setInt("dfs.replication", 2);
+ //System.setProperty("HADOOP_LOG_DIR", baseDirSpecified+"/logs");
+ conf.setInt("hadoop.logfile.count", 1);
+ conf.setInt("hadoop.logfile.size", 1000000000);
+
+ // logging configuration for namenode
+ logFile = LogFormatter.initFileHandler( conf, "namenode" );
+ LogFormatter.setShowThreadIDs(true);
+ NameNode.stateChangeLog.setLevel( Level.FINEST);
+ }
+
+ private void startDFS( int dataNodeNum) throws IOException {
+ //
+ // start a NameNode
+ String nameNodeSocketAddr = "localhost:" + nameNodePort;
+ conf.set("fs.default.name", nameNodeSocketAddr);
+
+ String nameFSDir = baseDirSpecified + "/name";
+ conf.set("dfs.name.dir", nameFSDir);
+
+ NameNode.format(conf);
+
+ nameNodeDaemon = new NameNode(new File(nameFSDir), nameNodePort, conf);
+
+ //
+ // start DataNodes
+ //
+ for (int i = 0; i < dataNodeNum; i++) {
+ // uniquely config real fs path for data storage for this datanode
+ String dataDir = baseDirSpecified + "/datanode" + i;
+ conf.set("dfs.data.dir", dataDir);
+ DataNode dn = DataNode.makeInstanceForDir(dataDir, conf);
+ if (dn != null) {
+ dataNodeDaemons.add(dn);
+ (new Thread(dn, "DataNode" + i + ": " + dataDir)).start();
+ }
+ }
+
+ assertTrue("incorrect datanodes for test to continue",
+ (dataNodeDaemons.size() == dataNodeNum));
+ //
+ // wait for datanodes to report in
+ try {
+ awaitQuiescence();
+ } catch( InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // act as if namenode is a remote process
+ dfsClient = new DFSClient(new InetSocketAddress("localhost", nameNodePort), conf);
+ }
+
+ private void shutdownDFS() {
+ // shutdown client
+ if (dfsClient != null) {
+ try {
+ msg("close down subthreads of DFSClient");
+ dfsClient.close();
+ } catch (Exception ignored) { }
+ msg("finished close down of DFSClient");
+ }
+
+ //
+ // shut down datanode daemons (this takes advantage of being same-process)
+ msg("begin shutdown of all datanode daemons" );
+
+ for (int i = 0; i < dataNodeDaemons.size(); i++) {
+ DataNode dataNode = (DataNode) dataNodeDaemons.get(i);
+ try {
+ dataNode.shutdown();
+ } catch (Exception e) {
+ msg("ignoring exception during (all) datanode shutdown, e=" + e);
+ }
+ }
+ msg("finished shutdown of all datanode daemons");
+
+ // shutdown namenode
+ msg("begin shutdown of namenode daemon");
+ try {
+ nameNodeDaemon.stop();
+ } catch (Exception e) {
+ msg("ignoring namenode shutdown exception=" + e);
+ }
+ msg("finished shutdown of namenode daemon");
+ }
+
+ /** Wait for the DFS datanodes to become quiescent.
+ * The initial implementation is to sleep for some fixed amount of time,
+ * but a better implementation would be to really detect when distributed
+ * operations are completed.
+ * @throws InterruptedException
+ */
+ private void awaitQuiescence() throws InterruptedException {
+ // ToDo: Need observer pattern, not static sleep
+ // Doug suggested that the block report interval could be made shorter
+ // and then observing that would be a good way to know when an operation
+ // was complete (quiescence detect).
+ sleepAtLeast(30000);
+ }
+
+ private void msg(String s) {
+ //System.out.println(s);
+ LOG.info(s);
+ }
+
+ public static void sleepAtLeast(int tmsec) {
+ long t0 = System.currentTimeMillis();
+ long t1 = t0;
+ long tslept = t1 - t0;
+ while (tmsec > tslept) {
+ try {
+ long tsleep = tmsec - tslept;
+ Thread.sleep(tsleep);
+ t1 = System.currentTimeMillis();
+ } catch (InterruptedException ie) {
+ t1 = System.currentTimeMillis();
+ }
+ tslept = t1 - t0;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ String usage = "Usage: ClusterTestDFSNameSpaceChangeLogging (no args)";
+ if (args.length != 0) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+ String[] testargs = {"org.apache.hadoop.dfs.ClusterTestDFSNameSpaceChangeLogging"};
+ junit.textui.TestRunner.main(testargs);
+ }
+
+}