You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/03 23:39:27 UTC
svn commit: r525290 [1/3] - in /lucene/hadoop/trunk: ./ bin/
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/
src/test/org/apache/hadoop/dfs/
Author: cutting
Date: Tue Apr 3 14:39:25 2007
New Revision: 525290
URL: http://svn.apache.org/viewvc?view=rev&rev=525290
Log:
HADOOP-702. Add tools to help automate HDFS upgrades. Contributed by Konstantin.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/InconsistentFSStateException.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSRollback.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStartupVersions.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSStorageStateRecovery.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSUpgrade.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/bin/start-dfs.sh
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/IncorrectVersionException.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCorruption.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Apr 3 14:39:25 2007
@@ -94,6 +94,9 @@
29. HADOOP-1156. Fix a NullPointerException in MiniDFSCluster.
(Hairong Kuang via cutting)
+30. HADOOP-702. Add tools to help automate HDFS upgrades.
+ (Konstantin Shvachko via cutting)
+
Release 0.12.3 (not yet released)
Modified: lucene/hadoop/trunk/bin/start-dfs.sh
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/bin/start-dfs.sh?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/bin/start-dfs.sh (original)
+++ lucene/hadoop/trunk/bin/start-dfs.sh Tue Apr 3 14:39:25 2007
@@ -1,15 +1,37 @@
#!/bin/sh
-# Start hadoop dfs daemons. Run this on master node.
+# Start hadoop dfs daemons.
+# Optinally upgrade or rollback dfs state.
+# Run this on master node.
+##
+
+usage="Usage: start-dfs.sh [-upgrade|-rollback]"
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/hadoop-config.sh
+# get arguments
+if [ $# -ge 1 ]; then
+ nameStartOpt=$1
+ shift
+ case $nameStartOpt in
+ (-upgrade)
+ ;;
+ (-rollback)
+ dataStartOpt=$nameStartOpt
+ ;;
+ (*)
+ echo $usage
+ exit 1
+ ;;
+ esac
+fi
+
# start dfs daemons
# start namenode after datanodes, to minimize time namenode is up w/o data
# note: datanodes will log connection errors until namenode starts
-"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start namenode
-"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR start datanode
+"$bin"/hadoop-daemon.sh --config $HADOOP_CONF_DIR start namenode $nameStartOpt
+"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR start datanode $dataStartOpt
"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR --hosts masters start secondarynamenode
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java Tue Apr 3 14:39:25 2007
@@ -17,9 +17,40 @@
*/
package org.apache.hadoop.dfs;
+import java.io.*;
import org.apache.hadoop.io.*;
-import java.io.*;
+class DatanodeCommand implements Writable {
+ DatanodeProtocol.DataNodeAction action;
+
+ public DatanodeCommand() {
+ this( DatanodeProtocol.DataNodeAction.DNA_UNKNOWN );
+ }
+
+ public DatanodeCommand( DatanodeProtocol.DataNodeAction action ) {
+ this.action = action;
+ }
+
+ ///////////////////////////////////////////
+ // Writable
+ ///////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory
+ (BlockCommand.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new DatanodeCommand(); }
+ });
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeEnum( out, action );
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.action = (DatanodeProtocol.DataNodeAction)
+ WritableUtils.readEnum( in, DatanodeProtocol.DataNodeAction.class );
+ }
+}
/****************************************************
* A BlockCommand is an instruction to a datanode
@@ -30,25 +61,11 @@
*
* @author Mike Cafarella
****************************************************/
-class BlockCommand implements Writable {
-
- static { // register a ctor
- WritableFactories.setFactory
- (BlockCommand.class,
- new WritableFactory() {
- public Writable newInstance() { return new BlockCommand(); }
- });
- }
-
- DatanodeProtocol.DataNodeAction action;
+class BlockCommand extends DatanodeCommand {
Block blocks[];
DatanodeInfo targets[][];
- public BlockCommand() {
- this.action = DatanodeProtocol.DataNodeAction.DNA_UNKNOWN;
- this.blocks = new Block[0];
- this.targets = new DatanodeInfo[0][];
- }
+ public BlockCommand() {}
/**
* Create BlockCommand for transferring blocks to another datanode
@@ -56,7 +73,7 @@
* @param targets nodes to transfer
*/
public BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
- this.action = DatanodeProtocol.DataNodeAction.DNA_TRANSFER;
+ super( DatanodeProtocol.DataNodeAction.DNA_TRANSFER );
this.blocks = blocks;
this.targets = targets;
}
@@ -66,16 +83,11 @@
* @param blocks blocks to invalidate
*/
public BlockCommand(Block blocks[]) {
- this.action = DatanodeProtocol.DataNodeAction.DNA_INVALIDATE;
+ super( DatanodeProtocol.DataNodeAction.DNA_INVALIDATE );
this.blocks = blocks;
this.targets = new DatanodeInfo[0][];
}
- public BlockCommand( DatanodeProtocol.DataNodeAction action ) {
- this();
- this.action = action;
- }
-
public Block[] getBlocks() {
return blocks;
}
@@ -87,8 +99,16 @@
///////////////////////////////////////////
// Writable
///////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory
+ (BlockCommand.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new BlockCommand(); }
+ });
+ }
+
public void write(DataOutput out) throws IOException {
- WritableUtils.writeEnum( out, action );
+ super.write( out );
out.writeInt(blocks.length);
for (int i = 0; i < blocks.length; i++) {
blocks[i].write(out);
@@ -103,8 +123,7 @@
}
public void readFields(DataInput in) throws IOException {
- this.action = (DatanodeProtocol.DataNodeAction)
- WritableUtils.readEnum( in, DatanodeProtocol.DataNodeAction.class );
+ super.readFields( in );
this.blocks = new Block[in.readInt()];
for (int i = 0; i < blocks.length; i++) {
blocks[i] = new Block();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue Apr 3 14:39:25 2007
@@ -29,11 +29,10 @@
**********************************************************************/
interface ClientProtocol extends VersionedProtocol {
- /* 7 : periodic checkpoint added.
- * 8 : refreshNodes added
- * 9 : clientMachine is removed from open() and create().
+ /*
+ * 10: finalizeUpgrade() added
*/
- public static final long versionID = 9L;
+ public static final long versionID = 10L;
///////////////////////////////////////
// File contents
@@ -348,4 +347,12 @@
*/
public void rollFsImage() throws IOException;
+ /**
+ * Finalize previous upgrade.
+ * Remove file system state saved during the upgrade.
+ * The upgrade will become irreversible.
+ *
+ * @throws IOException
+ */
+ public void finalizeUpgrade() throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java Tue Apr 3 14:39:25 2007
@@ -197,6 +197,26 @@
/**
+ * Command to ask the namenode to finalize previously performed upgrade.
+ * Usage: java DFSAdmin -finalizeUpgrade
+ * @exception IOException
+ */
+ public int finalizeUpgrade() throws IOException {
+ int exitCode = -1;
+
+ if (!(fs instanceof DistributedFileSystem)) {
+ System.out.println("FileSystem is " + fs.getUri());
+ return exitCode;
+ }
+
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ dfs.finalizeUpgrade();
+ exitCode = 0;
+
+ return exitCode;
+ }
+
+ /**
* Displays format of commands.
* @param cmd The command that is being executed.
*/
@@ -210,11 +230,15 @@
} else if ("-refreshNodes".equals(cmd)) {
System.err.println("Usage: java DFSAdmin"
+ " [-refreshNodes]");
+ } else if ("-finalizeUpgrade".equals(cmd)) {
+ System.err.println("Usage: java DFSAdmin"
+ + " [-finalizeUpgrade]");
} else {
System.err.println("Usage: java DFSAdmin");
System.err.println(" [-report]");
System.err.println(" [-safemode enter | leave | get | wait]");
System.err.println(" [-refreshNodes]");
+ System.err.println(" [-finalizeUpgrade]");
System.err.println(" [-help [cmd]]");
}
}
@@ -253,6 +277,11 @@
printUsage(cmd);
return exitCode;
}
+ } else if ("-finalizeUpgrade".equals(cmd)) {
+ if (argv.length != 1) {
+ printUsage(cmd);
+ return exitCode;
+ }
}
@@ -276,6 +305,8 @@
setSafeMode(argv, i);
} else if ("-refreshNodes".equals(cmd)) {
exitCode = refreshNodes();
+ } else if ("-finalizeUpgrade".equals(cmd)) {
+ exitCode = finalizeUpgrade();
} else if ("-help".equals(cmd)) {
if (i < argv.length) {
printHelp(argv[i]);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Apr 3 14:39:25 2007
@@ -377,6 +377,13 @@
}
/**
+ * @see ClientProtocol#finalizeUpgrade()
+ */
+ public void finalizeUpgrade() throws IOException {
+ namenode.finalizeUpgrade();
+ }
+
+ /**
*/
public boolean mkdirs(UTF8 src) throws IOException {
checkOpen();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Apr 3 14:39:25 2007
@@ -106,18 +106,20 @@
return new InetSocketAddress(hostname, port);
}
- DatanodeProtocol namenode;
- FSDataset data;
- DatanodeRegistration dnRegistration;
+ DatanodeProtocol namenode = null;
+ FSDataset data = null;
+ DatanodeRegistration dnRegistration = null;
private String networkLoc;
volatile boolean shouldRun = true;
Vector receivedBlockList = new Vector();
int xmitsInProgress = 0;
Daemon dataXceiveServer = null;
long blockReportInterval;
+ long lastBlockReport = 0;
+ long lastHeartbeat = 0;
long heartBeatInterval;
private DataStorage storage = null;
- private StatusHttpServer infoServer;
+ private StatusHttpServer infoServer = null;
private DataNodeMetrics myMetrics = new DataNodeMetrics();
private static InetSocketAddress nameNodeAddr;
private static DataNode datanodeObject = null;
@@ -187,73 +189,57 @@
}
/**
- * Create the DataNode given a configuration and an array of dataDirs.
- * 'dataDirs' is where the blocks are stored.
+ * @deprecated
+ * TODO: only MiniDFSCluster needs it, should be removed
*/
- DataNode(Configuration conf, String[] dataDirs) throws IOException {
- this(conf, NetworkTopology.DEFAULT_RACK, dataDirs );
- }
-
- DataNode(Configuration conf, String networkLoc, String[] dataDirs) throws IOException {
- this(networkLoc, dataDirs,
- createSocketAddr(conf.get("fs.default.name", "local")), conf);
- // register datanode
- int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
- String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
- this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
- //create a servlet to serve full-file content
- this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
- this.infoServer.start();
- this.dnRegistration.infoPort = this.infoServer.getPort();
- // register datanode
- try {
- register();
- } catch (IOException ie) {
- try {
- infoServer.stop();
- } catch (Exception e) {
- }
- throw ie;
- }
- datanodeObject = this;
+ DataNode( Configuration conf, String networkLoc, String[] dataDirs ) throws IOException {
+ // networkLoc is ignored since it is already in the conf
+ this( conf, Storage.makeListOfFiles( dataDirs ) );
}
/**
- * A DataNode can also be created with configuration information
- * explicitly given.
- *
- * @see DataStorage
+ * Create the DataNode given a configuration and an array of dataDirs.
+ * 'dataDirs' is where the blocks are stored.
*/
- private DataNode(String networkLoc,
- String[] dataDirs,
- InetSocketAddress nameNodeAddr,
- Configuration conf ) throws IOException {
- File[] volumes = new File[dataDirs.length];
- for (int idx = 0; idx < dataDirs.length; idx++) {
- volumes[idx] = new File(dataDirs[idx]);
+ DataNode( Configuration conf,
+ AbstractList<File> dataDirs ) throws IOException {
+ try {
+ startDataNode( conf, dataDirs );
+ } catch (IOException ie) {
+ shutdown();
+ throw ie;
}
-
+ }
+
+ void startDataNode( Configuration conf,
+ AbstractList<File> dataDirs
+ ) throws IOException {
// use configured nameserver & interface to get local hostname
- machineName =
- DNS.getDefaultHost
- (conf.get("dfs.datanode.dns.interface","default"),
- conf.get("dfs.datanode.dns.nameserver","default"));
-
- // get storage info and lock the data dirs
- storage = new DataStorage( volumes );
- int numDirs = storage.getNumLocked();
- if (numDirs == 0) { // all data dirs are in use
- throw new IOException("Cannot start multiple Datanode instances "
- + "sharing the same data directories.\n"
- + StringUtils.arrayToString(dataDirs) + " are locked. ");
- }
- volumes = storage.getLockedDirs();
+ machineName = DNS.getDefaultHost(
+ conf.get("dfs.datanode.dns.interface","default"),
+ conf.get("dfs.datanode.dns.nameserver","default"));
+ InetSocketAddress nameNodeAddr = createSocketAddr(
+ conf.get("fs.default.name", "local"));
+
// connect to name node
this.namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,
DatanodeProtocol.versionID,
nameNodeAddr,
conf);
+ // get version and id info from the name-node
+ NamespaceInfo nsInfo = handshake();
+
+ // read storage info, lock data dirs and transition fs state if necessary
+ StartupOption startOpt = (StartupOption)conf.get( "dfs.datanode.startup",
+ StartupOption.REGULAR );
+ assert startOpt != null : "Startup option must be set.";
+ storage = new DataStorage();
+ storage.recoverTransitionRead( nsInfo, dataDirs, startOpt );
+
+ // initialize data node internal structure
+ this.data = new FSDataset( storage, conf );
+
// find free port
ServerSocket ss = null;
int tmpPort = conf.getInt("dfs.datanode.port", 50010);
@@ -268,15 +254,11 @@
}
}
// construct registration
- this.dnRegistration = new DatanodeRegistration(
- DFS_CURRENT_VERSION,
- machineName + ":" + tmpPort,
- storage.getStorageID(),
- -1,
- "" );
- this.networkLoc = networkLoc;
- // initialize data node internal structure
- this.data = new FSDataset(volumes, conf);
+ this.dnRegistration = new DatanodeRegistration(
+ machineName + ":" + tmpPort,
+ -1, // info port determined later
+ storage );
+
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
long blockReportIntervalBasis =
@@ -284,7 +266,55 @@
this.blockReportInterval =
blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
- this.nameNodeAddr = nameNodeAddr;
+ DataNode.nameNodeAddr = nameNodeAddr;
+
+ //create a servlet to serve full-file content
+ int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
+ String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
+ this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
+ this.infoServer.addServlet(null, "/streamFile/*", StreamFile.class);
+ this.infoServer.start();
+ this.dnRegistration.infoPort = this.infoServer.getPort();
+ // get network location
+ this.networkLoc = conf.get( "dfs.datanode.rack" );
+ if( networkLoc == null ) // exec network script or set the default rack
+ networkLoc = getNetworkLoc( conf );
+ // register datanode
+ register();
+ datanodeObject = this;
+ }
+
+ private NamespaceInfo handshake() throws IOException {
+ NamespaceInfo nsInfo;
+ while( true ) {
+ try {
+ nsInfo = namenode.versionRequest();
+ break;
+ } catch( SocketTimeoutException e ) { // namenode is busy
+ LOG.info("Problem connecting to server: " + getNameNodeAddr());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {}
+ }
+ }
+ String errorMsg = null;
+ // verify build version
+ if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
+ errorMsg = "Incompatible build versions: namenode BV = "
+ + nsInfo.getBuildVersion() + "; datanode BV = "
+ + Storage.getBuildVersion();
+ LOG.fatal( errorMsg );
+ try {
+ namenode.errorReport( dnRegistration,
+ DatanodeProtocol.NOTIFY, errorMsg );
+ } catch( SocketTimeoutException e ) { // namenode is busy
+ LOG.info("Problem connecting to server: " + getNameNodeAddr());
+ }
+ throw new IOException( errorMsg );
+ }
+ assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+ "Data-node and name-node layout versions must be the same.";
+ return nsInfo;
}
/** Return the DataNode object
@@ -314,7 +344,7 @@
* 2) to receive a registrationID
* issued by the namenode to recognize registered datanodes.
*
- * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+ * @see FSNamesystem#registerDatanode(DatanodeRegistration,String)
* @throws IOException
*/
private void register() throws IOException {
@@ -355,7 +385,7 @@
}
if (storage != null) {
try {
- this.storage.closeAll();
+ this.storage.unlockAll();
} catch (IOException ie) {
}
}
@@ -388,7 +418,6 @@
*/
public void offerService() throws Exception {
- long lastHeartbeat = 0, lastBlockReport = 0;
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
//
@@ -410,47 +439,17 @@
// -- Total capacity
// -- Bytes remaining
//
- BlockCommand cmd = namenode.sendHeartbeat(dnRegistration,
+ DatanodeCommand cmd = namenode.sendHeartbeat( dnRegistration,
data.getCapacity(),
data.getRemaining(),
xmitsInProgress,
xceiverCount.getValue());
//LOG.info("Just sent heartbeat, with name " + localName);
lastHeartbeat = now;
-
- if( cmd != null ) {
- switch( cmd.action ) {
- case DNA_TRANSFER:
- //
- // Send a copy of a block to another datanode
- //
- transferBlocks( cmd.getBlocks(), cmd.getTargets() );
- break;
- case DNA_INVALIDATE:
- //
- // Some local block(s) are obsolete and can be
- // safely garbage-collected.
- //
- Block toDelete[] = cmd.getBlocks();
- data.invalidate(toDelete);
- myMetrics.removedBlocks(toDelete.length);
- break;
- case DNA_SHUTDOWN:
- // shut down the data node
- this.shutdown();
- continue;
- case DNA_REGISTER:
- // namenode requested a registration
- register();
- lastHeartbeat=0;
- lastBlockReport=0;
- continue;
- default:
- LOG.warn( "Unknown BlockCommand action: " + cmd.action);
- }
- }
+ if( ! processCommand( cmd ) )
+ continue;
}
-
+
// send block report
if (now - lastBlockReport > blockReportInterval) {
//
@@ -458,9 +457,9 @@
// Get back a list of local block(s) that are obsolete
// and can be safely GC'ed.
//
- Block toDelete[] = namenode.blockReport(dnRegistration,
- data.getBlockReport());
- data.invalidate(toDelete);
+ DatanodeCommand cmd = namenode.blockReport( dnRegistration,
+ data.getBlockReport());
+ processCommand( cmd );
lastBlockReport = now;
continue;
}
@@ -516,6 +515,51 @@
} // while (shouldRun)
} // offerService
+ /**
+ *
+ * @param cmd
+ * @return true if further processing may be required or false otherwise.
+ * @throws IOException
+ */
+ private boolean processCommand( DatanodeCommand cmd ) throws IOException {
+ if( cmd == null )
+ return true;
+ switch( cmd.action ) {
+ case DNA_TRANSFER:
+ //
+ // Send a copy of a block to another datanode
+ //
+ BlockCommand bcmd = (BlockCommand)cmd;
+ transferBlocks( bcmd.getBlocks(), bcmd.getTargets() );
+ break;
+ case DNA_INVALIDATE:
+ //
+ // Some local block(s) are obsolete and can be
+ // safely garbage-collected.
+ //
+ Block toDelete[] = ((BlockCommand)cmd).getBlocks();
+ data.invalidate(toDelete);
+ myMetrics.removedBlocks(toDelete.length);
+ break;
+ case DNA_SHUTDOWN:
+ // shut down the data node
+ this.shutdown();
+ return false;
+ case DNA_REGISTER:
+ // namenode requested a registration
+ register();
+ lastHeartbeat=0;
+ lastBlockReport=0;
+ break;
+ case DNA_FINALIZE:
+ storage.finalizeUpgrade();
+ break;
+ default:
+ LOG.warn( "Unknown DatanodeCommand action: " + cmd.action);
+ }
+ return true;
+ }
+
private void transferBlocks( Block blocks[],
DatanodeInfo xferTargets[][]
) throws IOException {
@@ -1074,9 +1118,9 @@
/** Start datanode daemon.
*/
- public static void run(Configuration conf, String networkLoc) throws IOException {
+ public static void run(Configuration conf) throws IOException {
String[] dataDirs = conf.getStrings("dfs.data.dir");
- DataNode dn = makeInstance(networkLoc, dataDirs, conf);
+ DataNode dn = makeInstance(dataDirs, conf);
if (dn != null) {
dataNodeList.add(dn);
Thread t = new Thread(dn, "DataNode: [" +
@@ -1087,25 +1131,34 @@
}
}
- /**
- * Shut down all datanodes that where started via the
- * run(conf,networkLoc) method.
- * Returns only after shutdown is complete.
- */
- public static void shutdownAll(){
- while (!dataNodeList.isEmpty()) {
- dataNodeList.remove(0).shutdown();
- dataNodeThreadList.remove(0).interrupt();
- }
+ /**
+ * Shut down all datanodes that where started via the
+ * run(conf,networkLoc) method.
+ * Returns only after shutdown is complete.
+ */
+ public static void shutdownAll(){
+ while (!dataNodeList.isEmpty()) {
+ dataNodeList.remove(0).shutdown();
+ dataNodeThreadList.remove(0).interrupt();
}
-
+ }
/** Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting.
*/
- private static void runAndWait(Configuration conf, String networkLoc)
- throws IOException {
- run(conf, networkLoc);
+ static DataNode createDataNode( String args[],
+ Configuration conf ) throws IOException {
+ if( conf == null )
+ conf = new Configuration();
+ if( ! parseArguments( args, conf )) {
+ printUsage();
+ return null;
+ }
+ run(conf);
+ return (DataNode)dataNodeList.get(0);
+ }
+
+ void join() {
if (dataNodeThreadList.size() > 0) {
Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
try {
@@ -1125,24 +1178,22 @@
* no directory from this directory list can be created.
* @throws IOException
*/
- static DataNode makeInstance( String[] dataDirs, Configuration conf)
- throws IOException {
- return makeInstance(NetworkTopology.DEFAULT_RACK, dataDirs, conf );
- }
-
- static DataNode makeInstance(String networkLoc, String[] dataDirs, Configuration conf)
+ static DataNode makeInstance( String[] dataDirs, Configuration conf )
throws IOException {
- ArrayList<String> dirs = new ArrayList<String>();
+ ArrayList<File> dirs = new ArrayList<File>();
for (int i = 0; i < dataDirs.length; i++) {
File data = new File(dataDirs[i]);
try {
DiskChecker.checkDir( data );
- dirs.add(dataDirs[i]);
+ dirs.add(data);
} catch( DiskErrorException e ) {
LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() );
}
}
- return ((dirs.size() > 0) ? new DataNode(conf, networkLoc, dirs.toArray(new String[dirs.size()])) : null);
+ if( dirs.size() > 0 )
+ return new DataNode(conf, dirs);
+ LOG.error("All directories in dfs.data.dir are invalid." );
+ return null;
}
public String toString() {
@@ -1154,11 +1205,49 @@
"}";
}
+ private static void printUsage() {
+ System.err.println("Usage: java DataNode");
+ System.err.println(" [-r, --rack <network location>] |");
+ System.err.println(" [-rollback]");
+ }
+
+ /**
+ * Parse and verify command line arguments and set configuration parameters.
+ *
+ * @return false if passed argements are incorrect
+ */
+ private static boolean parseArguments(String args[],
+ Configuration conf ) {
+ int argsLen = (args == null) ? 0 : args.length;
+ StartupOption startOpt = StartupOption.REGULAR;
+ String networkLoc = null;
+ for( int i=0; i < argsLen; i++ ) {
+ String cmd = args[i];
+ if( "-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd) ) {
+ if( i==args.length-1 )
+ return false;
+ networkLoc = args[++i];
+ if( networkLoc.startsWith("-") )
+ return false;
+ } else if( "-rollback".equalsIgnoreCase(cmd) ) {
+ startOpt = StartupOption.ROLLBACK;
+ } else if( "-regular".equalsIgnoreCase(cmd) ) {
+ startOpt = StartupOption.REGULAR;
+ } else
+ return false;
+ }
+ if( networkLoc != null )
+ conf.set( "dfs.datanode.rack", NodeBase.normalize( networkLoc ));
+ conf.setObject( "dfs.datanode.startup", startOpt );
+ return true;
+ }
+
/* Get the network location by running a script configured in conf */
private static String getNetworkLoc( Configuration conf )
throws IOException {
String locScript = conf.get("dfs.network.script" );
- if( locScript == null ) return null;
+ if( locScript == null )
+ return NetworkTopology.DEFAULT_RACK;
LOG.info( "Starting to run script to get datanode network location");
Process p = Runtime.getRuntime().exec( locScript );
@@ -1222,49 +1311,13 @@
return networkLoc.toString();
}
-
- /* Get the network location from the command line */
- private static String getNetworkLoc(String args[]) {
- for( int i=0; i< args.length; i++ ) {
- if ("-r".equals(args[i])||"--rack".equals(args[i]) ) {
- if( i==args.length-1 ) {
- printUsage();
- } else {
- return args[++i];
- }
- }
- }
- return null;
- }
-
- /* Return the datanode's network location
- * either from the command line, from script, or a default value
- */
- private static String getNetworkLoc(String args[], Configuration conf)
- throws IOException {
- String networkLoc = getNetworkLoc( args );
- if( networkLoc == null ) {
- networkLoc = getNetworkLoc( conf );
- }
- if( networkLoc == null ) {
- return NetworkTopology.DEFAULT_RACK;
- } else {
- return NodeBase.normalize( networkLoc );
- }
- }
-
- private static void printUsage() {
- System.err.println(
- "Usage: java DataNode [-r, --rack <network location>]");
- }
-
-
/**
*/
- public static void main(String args[]) throws IOException {
+ public static void main(String args[]) {
try {
- Configuration conf = new Configuration();
- runAndWait(conf, getNetworkLoc(args, conf));
+ DataNode datanode = createDataNode( args, null );
+ if( datanode != null )
+ datanode.join();
} catch ( Throwable e ) {
LOG.error( StringUtils.stringifyException( e ) );
System.exit(-1);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java Tue Apr 3 14:39:25 2007
@@ -4,213 +4,409 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
+import java.util.Collection;
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Properties;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.dfs.FSConstants.StartupOption;
+import org.apache.hadoop.dfs.FSConstants.NodeType;
+import org.apache.hadoop.dfs.FSImage.NameNodeFile;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.fs.FileUtil.HardLink;
/**
* Data storage information file.
* <p>
- * During startup the datanode reads its data storage file.
- * The data storage file is stored in all the dfs.data.dir directories.
- * It contains version and storageID.
- * Datanode holds a lock on all the dataStorage files while it runs so that other
- * datanodes were not able to start working with the same data storage.
- * The locks are released when the datanode stops (normally or abnormally).
- *
+ * @see Storage
* @author Konstantin Shvachko
*/
-class DataStorage {
- public static final String STORAGE_INFO_FILE_NAME = "storage";
- public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataStorage");
-
- // persistent fields
- private int version = 0; /// stored version
- private String storageID; /// unique per cluster storageID
+class DataStorage extends Storage {
+ // Constants
+ final static String BLOCK_SUBDIR_PREFIX = "subdir";
+ final static String BLOCK_FILE_PREFIX = "blk_";
- // non persistent fields
- private ArrayList storageFiles = new ArrayList();
- private ArrayList storageLocks = new ArrayList();
+ private String storageID;
+
+ DataStorage() {
+ super( NodeType.DATA_NODE );
+ storageID = "";
+ }
- // cache away the names of all passed in dirs
- private File[] origDirs = null;
+ DataStorage( int nsID, long cT, String strgID ) {
+ super( NodeType.DATA_NODE, nsID, cT );
+ this.storageID = strgID;
+ }
- // cache away the names of locked dirs
- private File[] dirs = null;
+ DataStorage( StorageInfo storageInfo, String strgID ) {
+ super( NodeType.DATA_NODE, storageInfo );
+ this.storageID = strgID;
+ }
+
+ String getStorageID() {
+ return storageID;
+ }
- private int numLocked = 0;
+ void setStorageID( String newStorageID ) {
+ this.storageID = newStorageID;
+ }
/**
- * Create DataStorage and verify its version.
+ * Analyze storage directories.
+ * Recover from previous transitions if required.
+ * Perform fs state transition if necessary depending on the namespace info.
+ * Read storage info.
*
+ * @param nsInfo namespace information
* @param dataDirs array of data storage directories
+ * @param startOpt startup option
* @throws IOException
*/
- public DataStorage( File[] dataDirs ) throws IOException {
- this( DataNode.DFS_CURRENT_VERSION, dataDirs );
+ void recoverTransitionRead( NamespaceInfo nsInfo,
+ Collection<File> dataDirs,
+ StartupOption startOpt
+ ) throws IOException {
+ assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+ "Data-node and name-node layout versions must be the same.";
- if( version < FSConstants.DFS_CURRENT_VERSION ) // future version
- throw new IncorrectVersionException( version, "data storage" );
- }
-
- /**
- * Create DataStorage.
- *
- * Read data storage files if they exist or create them if not.
- * Lock the files.
- *
- * @param curVersion can be used to read file saved with a previous version.
- * @param dataDirs Array of data storage directories
- * @throws IOException
- */
- public DataStorage( int curVersion, File[] dataDirs ) throws IOException {
- this.version = curVersion;
- this.origDirs = dataDirs;
- for (int idx = 0; idx < dataDirs.length; idx++) {
- storageFiles.add(idx, new RandomAccessFile(
- new File(dataDirs[idx], STORAGE_INFO_FILE_NAME ),
- "rws" ));
- lock(idx);
- boolean needToSave;
+ // 1. For each data directory calculate its state and
+ // check whether all is consistent before transitioning.
+ // Format and recover.
+ this.storageID = "";
+ this.storageDirs = new ArrayList<StorageDirectory>( dataDirs.size() );
+ ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>( dataDirs.size() );
+ for( Iterator<File> it = dataDirs.iterator(); it.hasNext(); ) {
+ File dataDir = it.next();
+ StorageDirectory sd = new StorageDirectory( dataDir );
+ StorageState curState;
try {
- needToSave = read(idx);
- } catch( java.io.EOFException e ) {
- storageID = "";
- needToSave = true;
- }
-
- if( needToSave ) { write(idx); }
-
- RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
- if (file != null) { numLocked++; }
- }
- if (numLocked > 0) {
- this.dirs = new File[numLocked];
- int curidx = 0;
- for (int idx = 0; idx < dataDirs.length; idx++) {
- if (storageFiles.get(idx) != null) {
- dirs[curidx] = dataDirs[idx];
- curidx++;
+ curState = sd.analyzeStorage( startOpt );
+ // sd is locked but not opened
+ switch( curState ) {
+ case NORMAL:
+ break;
+ case NON_EXISTENT:
+ // ignore this storage
+ LOG.info( "Storage directory " + dataDir + " does not exist." );
+ it.remove();
+ continue;
+ case CONVERT:
+ convertLayout( sd, nsInfo );
+ break;
+ case NOT_FORMATTED: // format
+ LOG.info( "Storage directory " + dataDir + " is not formatted." );
+ LOG.info( "Formatting ..." );
+ format( sd, nsInfo );
+ break;
+ default: // recovery part is common
+ sd.doRecover( curState );
}
+ } catch (IOException ioe) {
+ sd.unlock();
+ throw ioe;
}
+ // add to the storage list
+ addStorageDir( sd );
+ dataDirStates.add( curState );
}
- }
-
- public int getVersion() {
- return version;
- }
- public String getStorageID() {
- return storageID;
+ if( dataDirs.size() == 0 ) // none of the data dirs exist
+ throw new IOException(
+ "All specified directories are not accessible or do not exist." );
+
+ // 2. Do transitions
+ // Each storage directory is treated individually.
+ // During sturtup some of them can upgrade or rollback
+ // while others could be uptodate for the regular startup.
+ for( int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+ doTransition( getStorageDir( idx ), nsInfo, startOpt );
+ assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
+ "Data-node and name-node layout versions must be the same.";
+ assert this.getCTime() == nsInfo.getCTime() :
+ "Data-node and name-node CTimes must be the same.";
+ }
+
+ // 3. Update all storages. Some of them might have just been formatted.
+ this.writeAll();
}
-
- public int getNumLocked() {
- return numLocked;
+
+ void format( StorageDirectory sd, NamespaceInfo nsInfo ) throws IOException {
+ sd.clearDirectory(); // create directory
+ this.layoutVersion = FSConstants.LAYOUT_VERSION;
+ this.namespaceID = nsInfo.getNamespaceID();
+ this.cTime = 0;
+ // store storageID as it currently is
+ sd.write();
}
-
- public File[] getLockedDirs() {
- return dirs;
+
+ protected void setFields( Properties props,
+ StorageDirectory sd
+ ) throws IOException {
+ super.setFields( props, sd );
+ props.setProperty( "storageID", storageID );
}
-
- public void setStorageID( String newStorageID ) {
- this.storageID = newStorageID;
+
+ protected void getFields( Properties props,
+ StorageDirectory sd
+ ) throws IOException {
+ super.getFields( props, sd );
+ String ssid = props.getProperty( "storageID" );
+ if( ssid == null ||
+ ! ("".equals( storageID ) || "".equals( ssid ) ||
+ storageID.equals( ssid )))
+ throw new InconsistentFSStateException( sd.root,
+ "has incompatible storage Id." );
+ if( "".equals( storageID ) ) // update id only if it was empty
+ storageID = ssid;
}
-
- public void setVersion( int newVersion ) {
- this.version = newVersion;
+
+ boolean isConversionNeeded( StorageDirectory sd ) throws IOException {
+ File oldF = new File( sd.root, "storage" );
+ if( ! oldF.exists() )
+ return false;
+ // check consistency of the old storage
+ File oldDataDir = new File( sd.root, "data" );
+ if( ! oldDataDir.exists() )
+ throw new InconsistentFSStateException( sd.root,
+ "Old layout block directory " + oldDataDir + " is missing" );
+ if( ! oldDataDir.isDirectory() )
+ throw new InconsistentFSStateException( sd.root,
+ oldDataDir + " is not a directory." );
+ if( ! oldDataDir.canWrite() )
+ throw new InconsistentFSStateException( sd.root,
+ oldDataDir + " is not writable." );
+ return true;
}
/**
- * Lock datastorage file.
+ * Automatic conversion from the old layout version to the new one.
*
+ * @param sd storage directory
+ * @param nsInfo namespace information
* @throws IOException
*/
- private void lock(int idx) throws IOException {
- RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
- FileLock lock = file.getChannel().tryLock();
- if (lock == null) {
- // log a warning
- LOG.warn("Cannot lock storage file in directory "+origDirs[idx].getName());
- // remove the file from fileList, and close it
- storageFiles.add(idx, null);
- file.close();
+ private void convertLayout( StorageDirectory sd,
+ NamespaceInfo nsInfo
+ ) throws IOException {
+ assert FSConstants.LAYOUT_VERSION < LAST_PRE_UPGRADE_LAYOUT_VERSION :
+ "Bad current layout version: FSConstants.LAYOUT_VERSION should decrease";
+ File oldF = new File( sd.root, "storage" );
+ File oldDataDir = new File( sd.root, "data" );
+ assert oldF.exists() : "Old datanode layout \"storage\" file is missing";
+ assert oldDataDir.exists() : "Old layout block directory \"data\" is missing";
+ LOG.info( "Old layout version file " + oldF
+ + " is found. New layout version is "
+ + FSConstants.LAYOUT_VERSION );
+ LOG.info( "Converting ..." );
+
+ // Lock and Read old storage file
+ RandomAccessFile oldFile = new RandomAccessFile( oldF, "rws" );
+ if (oldFile == null)
+ throw new IOException( "Cannot read file: " + oldF );
+ FileLock oldLock = oldFile.getChannel().tryLock();
+ if (oldLock == null)
+ throw new IOException( "Cannot lock file: " + oldF );
+ try {
+ oldFile.seek(0);
+ int odlVersion = oldFile.readInt();
+ if( odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION )
+ throw new IncorrectVersionException( odlVersion, "file " + oldF,
+ LAST_PRE_UPGRADE_LAYOUT_VERSION );
+ String odlStorageID = org.apache.hadoop.io.UTF8.readString( oldFile );
+
+ // check new storage
+ File newDataDir = sd.getCurrentDir();
+ File versionF = sd.getVersionFile();
+ if( versionF.exists() )
+ throw new IOException( "Version file already exists: " + versionF );
+ if( newDataDir.exists() ) // somebody created current dir manually
+ deleteDir( newDataDir );
+ // Write new layout
+ rename( oldDataDir, newDataDir );
+
+ this.layoutVersion = FSConstants.LAYOUT_VERSION;
+ this.namespaceID = nsInfo.getNamespaceID();
+ this.cTime = 0;
+ this.storageID = odlStorageID;
+ sd.write();
+ // close and unlock old file
+ } finally {
+ oldLock.release();
+ oldFile.close();
}
- storageLocks.add(idx, lock);
+ // move old storage file into current dir
+ rename( oldF, new File( sd.getCurrentDir(), "storage" ));
+ LOG.info( "Conversion of " + oldF + " is complete." );
}
-
+
/**
- * Unlock datastorage file.
- * @param idx File index
+ * Analize which and whether a transition of the fs state is required
+ * and perform it if necessary.
*
+ * Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime
+ * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime
+ * Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
+ *
+ * @param sd storage directory
+ * @param nsInfo namespace info
+ * @param startOpt startup option
* @throws IOException
*/
- private void unlock(int idx) throws IOException {
- FileLock lock = (FileLock) storageLocks.get(idx);
- if (lock != null) { lock.release(); }
- }
-
- /**
- * Close a datastorage file.
- * @param idx file index
- * @throws IOException
- */
- private void close(int idx) throws IOException {
- FileLock lock = (FileLock) storageLocks.get(idx);
- if (lock == null) { return; }
- lock.release();
- RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
- file.close();
- }
-
- /**
- * Close all datastorage files.
- * @throws IOException
- */
- public void closeAll() throws IOException {
- for (int idx = 0; idx < dirs.length; idx++) {
- close(idx);
+ private void doTransition( StorageDirectory sd,
+ NamespaceInfo nsInfo,
+ StartupOption startOpt
+ ) throws IOException {
+ if( startOpt == StartupOption.ROLLBACK )
+ doRollback( sd, nsInfo ); // rollback if applicable
+ sd.read();
+ assert this.layoutVersion >= FSConstants.LAYOUT_VERSION :
+ "Future version is not allowed";
+ if( getNamespaceID() != nsInfo.getNamespaceID() )
+ throw new IOException(
+ "Incompatible namespaceIDs in " + sd.root.getCanonicalPath()
+ + ": namenode namespaceID = " + nsInfo.getNamespaceID()
+ + "; datanode namespaceID = " + getNamespaceID() );
+ if( this.layoutVersion == FSConstants.LAYOUT_VERSION
+ && this.cTime == nsInfo.getCTime() )
+ return; // regular startup
+ if( this.layoutVersion > FSConstants.LAYOUT_VERSION
+ || this.cTime < nsInfo.getCTime() ) {
+ doUpgrade( sd, nsInfo ); // upgrade
+ return;
}
+ // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
+ // must shutdown
+ throw new IOException("Datanode state: LV = " + this.getLayoutVersion()
+ + " CTime = " + this.getCTime()
+ + " is newer than the namespace state: LV = "
+ + nsInfo.getLayoutVersion()
+ + " CTime = " + nsInfo.getCTime() );
}
-
+
/**
- * Read data storage file.
- * @param idx File index
- * @return whether the data storage file need to be updated.
+ * Move current storage into a backup directory,
+ * and hardlink all its blocks into the new current directory.
+ *
+ * @param sd storage directory
* @throws IOException
*/
- private boolean read(int idx) throws IOException {
- RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
- if (file == null) { return false; }
- file.seek(0);
- this.version = file.readInt();
- this.storageID = UTF8.readString( file );
- return false;
+ void doUpgrade( StorageDirectory sd,
+ NamespaceInfo nsInfo
+ ) throws IOException {
+ LOG.info( "Upgrading storage directory " + sd.root
+ + ".\n old LV = " + this.getLayoutVersion()
+ + "; old CTime = " + this.getCTime()
+ + ".\n new LV = " + nsInfo.getLayoutVersion()
+ + "; new CTime = " + nsInfo.getCTime() );
+ File curDir = sd.getCurrentDir();
+ File prevDir = sd.getPreviousDir();
+ assert curDir.exists() : "Current directory must exist.";
+ // delete previous dir before upgrading
+ if( prevDir.exists() )
+ deleteDir( prevDir );
+ File tmpDir = sd.getPreviousTmp();
+ assert ! tmpDir.exists() : "previous.tmp directory must not exist.";
+ // rename current to tmp
+ rename( curDir, tmpDir );
+ // hardlink blocks
+ linkBlocks( tmpDir, curDir );
+ // write version file
+ this.layoutVersion = FSConstants.LAYOUT_VERSION;
+ assert this.namespaceID == nsInfo.getNamespaceID() :
+ "Data-node and name-node layout versions must be the same.";
+ this.cTime = nsInfo.getCTime();
+ sd.write();
+ // rename tmp to previous
+ rename( tmpDir, prevDir );
+ LOG.info( "Upgrade of " + sd.root + " is complete." );
}
- /**
- * Write data storage file.
- * @param idx File index
- * @throws IOException
- */
- private void write(int idx) throws IOException {
- RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
- if (file == null) { return; }
- file.seek(0);
- file.writeInt( this.version );
- UTF8.writeString( file, this.storageID );
+ void doRollback( StorageDirectory sd,
+ NamespaceInfo nsInfo
+ ) throws IOException {
+ File prevDir = sd.getPreviousDir();
+ // regular startup if previous dir does not exist
+ if( ! prevDir.exists() )
+ return;
+ DataStorage prevInfo = new DataStorage();
+ StorageDirectory prevSD = prevInfo.new StorageDirectory( sd.root );
+ prevSD.read( prevSD.getPreviousVersionFile() );
+
+ // We allow rollback to a state, which is either consistent with
+ // the namespace state or can be further upgraded to it.
+ if( ! ( prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
+ && prevInfo.getCTime() <= nsInfo.getCTime() )) // cannot rollback
+ throw new InconsistentFSStateException( prevSD.root,
+ "Cannot rollback to a newer state.\nDatanode previous state: LV = "
+ + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
+ + " is newer than the namespace state: LV = "
+ + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime() );
+ LOG.info( "Rolling back storage directory " + sd.root
+ + ".\n target LV = " + nsInfo.getLayoutVersion()
+ + "; target CTime = " + nsInfo.getCTime() );
+ File tmpDir = sd.getRemovedTmp();
+ assert ! tmpDir.exists() : "removed.tmp directory must not exist.";
+ // rename current to tmp
+ File curDir = sd.getCurrentDir();
+ assert curDir.exists() : "Current directory must exist.";
+ rename( curDir, tmpDir );
+ // rename previous to current
+ rename( prevDir, curDir );
+ // delete tmp dir
+ deleteDir( tmpDir );
+ LOG.info( "Rollback of " + sd.root + " is complete." );
+ }
+
+ void doFinalize( StorageDirectory sd ) throws IOException {
+ File prevDir = sd.getPreviousDir();
+ if( ! prevDir.exists() )
+ return; // already discarded
+ final String dataDirPath = sd.root.getCanonicalPath();
+ LOG.info( "Finalizing upgrade for storage directory "
+ + dataDirPath
+ + ".\n cur LV = " + this.getLayoutVersion()
+ + "; cur CTime = " + this.getCTime() );
+ assert sd.getCurrentDir().exists() : "Current directory must exist.";
+ final File tmpDir = sd.getFinalizedTmp();
+ // rename previous to tmp
+ rename( prevDir, tmpDir );
+
+ // delete tmp dir in a separate thread
+ new Daemon( new Runnable() {
+ public void run() {
+ try {
+ deleteDir( tmpDir );
+ } catch( IOException ex ) {
+ LOG.error( "Finalize upgrade for " + dataDirPath + " failed.", ex );
+ }
+ LOG.info( "Finalize upgrade for " + dataDirPath + " is complete." );
+ }
+ public String toString() { return "Finalize " + dataDirPath; }
+ }).start();
}
- /**
- * Write all data storage files.
- * @throws IOException
- */
- public void writeAll() throws IOException {
- for (int idx = 0; idx < dirs.length; idx++) {
- write(idx);
+ void finalizeUpgrade() throws IOException {
+ for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
+ doFinalize( it.next() );
}
}
+ static void linkBlocks( File from, File to ) throws IOException {
+ if( ! from.isDirectory() ) {
+ HardLink.createHardLink( from, to );
+ return;
+ }
+ // from is a directory
+ if( ! to.mkdir() )
+ throw new IOException("Cannot create directory " + to );
+ String[] blockNames = from.list( new java.io.FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith( BLOCK_SUBDIR_PREFIX )
+ || name.startsWith( BLOCK_FILE_PREFIX );
+ }
+ });
+
+ for( int i = 0; i < blockNames.length; i++ )
+ linkBlocks( new File(from, blockNames[i]), new File(to, blockNames[i]) );
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Tue Apr 3 14:39:25 2007
@@ -31,9 +31,15 @@
* @author Michael Cafarella
**********************************************************************/
interface DatanodeProtocol extends VersionedProtocol {
- public static final long versionID = 5L; // register takes a new parameter
+ /*
+ * 6: versionRequest() added;
+ * sendHeartbeat() and blockReport() return DatanodeCommand;
+ * DatanodeRegistration contains StorageInfo
+ */
+ public static final long versionID = 6L;
// error code
+ final static int NOTIFY = 0;
final static int DISK_ERROR = 1;
final static int INVALID_BLOCK = 2;
@@ -45,7 +51,8 @@
DNA_TRANSFER, // transfer blocks to another datanode
DNA_INVALIDATE, // invalidate blocks
DNA_SHUTDOWN, // shutdown node
- DNA_REGISTER; } // re-register
+ DNA_REGISTER, // re-register
+ DNA_FINALIZE; } // finalize previous upgrade
/**
* Register Datanode.
@@ -63,14 +70,14 @@
/**
* sendHeartbeat() tells the NameNode that the DataNode is still
* alive and well. Includes some status info, too.
- * It also gives the NameNode a chance to return a "BlockCommand" object.
- * A BlockCommand tells the DataNode to invalidate local block(s),
+ * It also gives the NameNode a chance to return a "DatanodeCommand" object.
+ * A DatanodeCommand tells the DataNode to invalidate local block(s),
* or to copy them to other DataNodes, etc.
*/
- public BlockCommand sendHeartbeat(DatanodeRegistration registration,
- long capacity, long remaining,
- int xmitsInProgress,
- int xceiverCount) throws IOException;
+ public DatanodeCommand sendHeartbeat( DatanodeRegistration registration,
+ long capacity, long remaining,
+ int xmitsInProgress,
+ int xceiverCount) throws IOException;
/**
* blockReport() tells the NameNode about all the locally-stored blocks.
@@ -79,8 +86,8 @@
* the locally-stored blocks. It's invoked upon startup and then
* infrequently afterwards.
*/
- public Block[] blockReport( DatanodeRegistration registration,
- Block blocks[]) throws IOException;
+ public DatanodeCommand blockReport( DatanodeRegistration registration,
+ Block blocks[]) throws IOException;
/**
* blockReceived() allows the DataNode to tell the NameNode about
@@ -98,4 +105,6 @@
public void errorReport(DatanodeRegistration registration,
int errorCode,
String msg) throws IOException;
+
+ public NamespaceInfo versionRequest() throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java Tue Apr 3 14:39:25 2007
@@ -25,40 +25,50 @@
});
}
- int version; /// current Datanode version
- String registrationID; /// a unique per namenode id; indicates
- /// the namenode the datanode is registered with
+ StorageInfo storageInfo;
/**
* Default constructor.
*/
public DatanodeRegistration() {
- this( 0, null, null, -1, null );
+ super( null, null, -1 );
+ this.storageInfo = new StorageInfo();
}
/**
* Create DatanodeRegistration
*/
+ public DatanodeRegistration(String nodeName,
+ int infoPort,
+ DataStorage storage ) {
+ super( nodeName, storage.getStorageID(), infoPort );
+ this.storageInfo = new StorageInfo( storage );
+ }
+
+ /**
+ * Create DatanodeRegistration
+ * @deprecated
+ * use {@link #DatanodeRegistration(String, int, DataStorage)} instead
+ */
public DatanodeRegistration(int version,
String nodeName,
String storageID,
int infoPort,
String registrationID ) {
super( nodeName, storageID, infoPort );
- this.version = version;
- this.registrationID = registrationID;
+ this.storageInfo = new StorageInfo();
}
/**
*/
public int getVersion() {
- return version;
+ return storageInfo.getLayoutVersion();
}
/**
*/
public String getRegistrationID() {
- return registrationID;
+ return Storage.getRegistrationID( storageInfo );
}
/////////////////////////////////////////////////
@@ -67,16 +77,18 @@
/**
*/
public void write(DataOutput out) throws IOException {
- out.writeInt(this.version);
super.write( out );
- UTF8.writeString(out, registrationID);
+ out.writeInt( storageInfo.getLayoutVersion() );
+ out.writeInt( storageInfo.getNamespaceID() );
+ out.writeLong( storageInfo.getCTime() );
}
/**
*/
public void readFields(DataInput in) throws IOException {
- this.version = in.readInt();
super.readFields(in);
- this.registrationID = UTF8.readString(in);
+ storageInfo.layoutVersion = in.readInt();
+ storageInfo.namespaceID = in.readInt();
+ storageInfo.cTime = in.readLong();
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue Apr 3 14:39:25 2007
@@ -304,6 +304,14 @@
}
/**
+ * Finalize previously upgraded files system state.
+ * @throws IOException
+ */
+ public void finalizeUpgrade() throws IOException {
+ dfs.finalizeUpgrade();
+ }
+
+ /**
* We need to find the blocks that didn't match. Likely only one
* is corrupt but we will report both to the namenode. In the future,
* we can consider figuring out exactly which block is corrupt.
@@ -400,6 +408,14 @@
public void refreshNodes() throws IOException {
((RawDistributedFileSystem)fs).refreshNodes();
}
+
+ /**
+ * Finalize previously upgraded files system state.
+ */
+ public void finalizeUpgrade() throws IOException {
+ ((RawDistributedFileSystem)fs).finalizeUpgrade();
+ }
+
/**
* We need to find the blocks that didn't match. Likely only one
* is corrupt but we will report both to the namenode. In the future,
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Tue Apr 3 14:39:25 2007
@@ -122,11 +122,23 @@
// SafeMode actions
public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
+ // Startup options
+ public enum StartupOption{ FORMAT, REGULAR, UPGRADE, ROLLBACK; }
+
+ /**
+ * Type of the node
+ */
+ static public enum NodeType {
+ NAME_NODE,
+ DATA_NODE;
+ }
+
// Version is reflected in the dfs image and edit log files.
// Version is reflected in the data storage file.
// Versions are negative.
- // Decrement DFS_CURRENT_VERSION to define a new version.
- public static final int DFS_CURRENT_VERSION = -3;
- // Current version: New operations OP_DATANODE_REMOVE and OP_DATANODE_ADD
- // are introduced
+ // Decrement LAYOUT_VERSION to define a new version.
+ public static final int LAYOUT_VERSION = -4;
+ // Current version:
+ // Top level directory is reorganized to allow file system state
+ // transitions: upgrade, rollback, and finalize.
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Tue Apr 3 14:39:25 2007
@@ -118,7 +118,7 @@
if ( children == null || children.length == 0 ) {
children = new FSDir[maxBlocksPerDir];
for (int idx = 0; idx < maxBlocksPerDir; idx++) {
- children[idx] = new FSDir( new File(dir, "subdir"+idx) );
+ children[idx] = new FSDir( new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx) );
}
}
@@ -256,19 +256,19 @@
class FSVolume {
static final double USABLE_DISK_PCT_DEFAULT = 0.98f;
- private File dir;
+ private File dir; // TODO this field is redundant equals this.dataDir.dir.getParent()
private FSDir dataDir;
private File tmpDir;
private DF usage;
private long reserved;
private double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
- FSVolume(File dir, Configuration conf) throws IOException {
+ FSVolume( File currentDir, Configuration conf) throws IOException {
this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
(float) USABLE_DISK_PCT_DEFAULT);
- this.dir = dir;
- this.dataDir = new FSDir(new File(dir, "data"));
+ this.dir = currentDir.getParentFile();
+ this.dataDir = new FSDir( currentDir );
this.tmpDir = new File(dir, "tmp");
if (tmpDir.exists()) {
FileUtil.fullyDelete(tmpDir);
@@ -431,11 +431,11 @@
/**
* An FSDataset has a directory where it loads its data files.
*/
- public FSDataset(File[] dirs, Configuration conf) throws IOException {
+ public FSDataset( DataStorage storage, Configuration conf) throws IOException {
this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
- FSVolume[] volArray = new FSVolume[dirs.length];
- for (int idx = 0; idx < dirs.length; idx++) {
- volArray[idx] = new FSVolume(dirs[idx], conf);
+ FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
+ for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
+ volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
}
volumes = new FSVolumeSet(volArray);
volumeMap = new HashMap<Block,FSVolume>();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Tue Apr 3 14:39:25 2007
@@ -22,7 +22,6 @@
import java.io.*;
import java.util.*;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
@@ -315,13 +314,12 @@
TreeMap activeLocks = new TreeMap();
FSImage fsImage;
boolean ready = false;
- int namespaceID = 0; // TODO: move to FSImage class, it belongs there
// Metrics record
private MetricsRecord directoryMetrics = null;
/** Access an existing dfs name directory. */
- public FSDirectory(File[] dirs) throws IOException {
- this.fsImage = new FSImage( dirs );
+ public FSDirectory() throws IOException {
+ this.fsImage = new FSImage();
initialize();
}
@@ -335,12 +333,23 @@
directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
}
- void loadFSImage( Configuration conf ) throws IOException {
- fsImage.loadFSImage( conf );
+ void loadFSImage( Collection<File> dataDirs,
+ StartupOption startOpt ) throws IOException {
+ // format before starting up if requested
+ if( startOpt == StartupOption.FORMAT ) {
+ fsImage.setStorageDirectories( dataDirs );
+ fsImage.format();
+ startOpt = StartupOption.REGULAR;
+ }
+ try {
+ fsImage.recoverTransitionRead( dataDirs, startOpt );
+ } catch( IOException e ) {
+ fsImage.close();
+ throw e;
+ }
synchronized (this) {
this.ready = true;
this.notifyAll();
- fsImage.getEditLog().create();
}
}
@@ -353,7 +362,7 @@
* Shutdown the filestore
*/
public void close() throws IOException {
- fsImage.getEditLog().close();
+ fsImage.close();
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?view=diff&rev=525290&r1=525289&r2=525290
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Tue Apr 3 14:39:25 2007
@@ -26,10 +26,8 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.Vector;
+import java.util.ArrayList;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
@@ -48,60 +46,86 @@
private static final byte OP_DATANODE_ADD = 5;
private static final byte OP_DATANODE_REMOVE = 6;
- private static final String FS_EDIT = "edits";
- private static final String FS_EDIT_NEW = "edits.new";
+ private ArrayList<EditLogOutputStream> editStreams = null;
+ private FSImage fsimage = null;
- private File[] editFiles = null;
- private File[] editFilesNew = null;
+ static class EditLogOutputStream extends DataOutputStream {
+ private FileDescriptor fd;
- DataOutputStream[] editStreams = null;
- FileDescriptor[] editDescriptors = null;
- private FSImage fsimage = null;
+ EditLogOutputStream( File name ) throws IOException {
+ super( new FileOutputStream( name, true )); // open for append
+ this.fd = ((FileOutputStream)out).getFD();
+ }
- FSEditLog(File[] fsDirs, FSImage image) throws IOException {
- fsimage = image;
- editFiles = new File[fsDirs.length];
- editFilesNew = new File[fsDirs.length];
- for (int idx = 0; idx < fsDirs.length; idx++) {
- editFiles[idx] = new File(fsDirs[idx], FS_EDIT);
- editFilesNew[idx] = new File(fsDirs[idx], FS_EDIT_NEW);
- }
- }
+ void flushAndSync() throws IOException {
+ this.flush();
+ this.fd.sync();
+ }
+
+ void create() throws IOException {
+ writeInt( FSConstants.LAYOUT_VERSION );
+ flushAndSync();
+ }
+ }
- FSEditLog(File imageDir, FSImage image, String edits) throws IOException {
+ FSEditLog( FSImage image ) {
fsimage = image;
- editFiles = new File[1];
- editFiles[0] = new File(imageDir, edits);
- }
+ }
+
+ private File getEditFile( int idx ) {
+ return fsimage.getEditFile( idx );
+ }
+
+ private File getEditNewFile( int idx ) {
+ return fsimage.getEditNewFile( idx );
+ }
+
+ private int getNumStorageDirs() {
+ return fsimage.getNumStorageDirs();
+ }
+
+ int getNumEditStreams() {
+ return editStreams == null ? 0 : editStreams.size();
+ }
/**
+ * Create empty edit log files.
* Initialize the output stream for logging.
*
* @throws IOException
*/
- void create() throws IOException {
- editStreams = new DataOutputStream[editFiles.length];
- editDescriptors = new FileDescriptor[editFiles.length];
- for (int idx = 0; idx < editStreams.length; idx++) {
- FileOutputStream stream = new FileOutputStream(editFiles[idx]);
- editStreams[idx] = new DataOutputStream(stream);
- editDescriptors[idx] = stream.getFD();
- editStreams[idx].writeInt( FSConstants.DFS_CURRENT_VERSION );
+ void open() throws IOException {
+ int size = getNumStorageDirs();
+ if( editStreams == null )
+ editStreams = new ArrayList<EditLogOutputStream>( size );
+ for (int idx = 0; idx < size; idx++) {
+ File eFile = getEditFile( idx );
+ try {
+ EditLogOutputStream eStream = new EditLogOutputStream( eFile );
+ editStreams.add( eStream );
+ } catch (IOException e) {
+ FSNamesystem.LOG.warn( "Unable to open edit log file " + eFile );
+ processIOError(idx);
+ idx--;
+ }
}
}
+ void createEditLogFile( File name ) throws IOException {
+ EditLogOutputStream eStream = new EditLogOutputStream( name );
+ eStream.create();
+ eStream.flushAndSync();
+ eStream.close();
+ }
+
/**
* Create edits.new if non existant.
*/
void createNewIfMissing() throws IOException {
- for (int idx = 0; idx < editFilesNew.length; idx++) {
- if (!editFilesNew[idx].exists()) {
- FileOutputStream stream = new FileOutputStream(editFilesNew[idx]);
- DataOutputStream editStr = new DataOutputStream(stream);
- editStr.writeInt( FSConstants.DFS_CURRENT_VERSION );
- editStr.flush();
- editStr.close();
- }
+ for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+ File newFile = getEditNewFile( idx );
+ if( ! newFile.exists() )
+ createEditLogFile( newFile );
}
}
@@ -112,16 +136,17 @@
if (editStreams == null) {
return;
}
- for (int idx = 0; idx < editStreams.length; idx++) {
+ for (int idx = 0; idx < editStreams.size(); idx++) {
+ EditLogOutputStream eStream = editStreams.get( idx );
try {
- editStreams[idx].flush();
- editDescriptors[idx].sync();
- editStreams[idx].close();
+ eStream.flushAndSync();
+ eStream.close();
} catch (IOException e) {
processIOError(idx);
idx--;
}
}
+ editStreams.clear();
}
/**
@@ -131,42 +156,13 @@
* server to exit
*/
void processIOError(int index) throws IOException {
- if (editStreams == null || editStreams.length == 1) {
+ if (editStreams == null || editStreams.size() == 1) {
throw new IOException("Checkpoint directories inaccessible.");
}
- assert(index < editFiles.length);
- assert(editFiles.length == editFilesNew.length);
- assert(editFiles.length == editStreams.length);
- int newsize = editStreams.length - 1;
- int oldsize = editStreams.length;
-
- //
- // save existing values and allocate space for new ones
- //
- File[] editFiles1 = editFiles;
- File[] editFilesNew1 = editFilesNew;
- DataOutputStream[] editStreams1 = editStreams;
- FileDescriptor[] editDescriptors1 = editDescriptors;
- editFiles = new File[newsize];
- editFilesNew = new File[newsize];
- editStreams = new DataOutputStream[newsize];
- editDescriptors = new FileDescriptor[newsize];
+ assert(index < getNumStorageDirs());
+ assert(getNumStorageDirs() == editStreams.size());
- //
- // copy values from old into new, skip the one with error.
- //
- for (int idx = 0; idx < index; idx++) {
- editFiles[idx] = editFiles1[idx];
- editFilesNew[idx] = editFilesNew1[idx];
- editStreams[idx] = editStreams1[idx];
- editDescriptors[idx] = editDescriptors1[idx];
- }
- for (int idx = index; idx < oldsize - 1; idx++) {
- editFiles[idx] = editFiles1[idx+1];
- editFilesNew[idx] = editFilesNew1[idx+1];
- editStreams[idx] = editStreams1[idx+1];
- editDescriptors[idx] = editDescriptors1[idx+1];
- }
+ editStreams.remove( index );
//
// Invoke the ioerror routine of the fsimage
//
@@ -174,41 +170,11 @@
}
/**
- * Delete specified editLog
- */
- void delete(int idx) throws IOException {
- if (editStreams != null) {
- try {
- editStreams[idx].close();
- } catch (IOException e) {
- processIOError(idx);
- }
- }
- if (!editFiles[idx].delete() || !editFilesNew[idx].delete()) {
- if (editStreams != null) {
- processIOError(idx);
- }
- }
- }
-
- /**
- * check if ANY edits log exists
- */
- boolean exists() throws IOException {
- for (int idx = 0; idx < editFiles.length; idx++) {
- if (editFiles[idx].exists()) {
- return true;
- }
- }
- return false;
- }
-
- /**
* check if ANY edits.new log exists
*/
boolean existsNew() throws IOException {
- for (int idx = 0; idx < editFilesNew.length; idx++) {
- if (editFilesNew[idx].exists()) {
+ for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+ if (getEditNewFile( idx ).exists()) {
return true;
}
}
@@ -216,32 +182,11 @@
}
/**
- * check if a particular edits.new log exists
- */
- boolean existsNew(int idx) throws IOException {
- if (editFilesNew[idx].exists()) {
- return true;
- }
- return false;
- }
-
-
- /**
* Load an edit log, and apply the changes to the in-memory structure
* This is where we apply edits that we've been writing to disk all
* along.
*/
- int loadFSEdits(Configuration conf, int index) throws IOException {
- int numEdits = 0;
- numEdits = loadFSEdits(conf, editFiles[index]);
- if (editFilesNew[index].exists()) {
- numEdits += loadFSEdits(conf, editFilesNew[index]);
- }
- return numEdits;
- }
-
- int loadFSEdits( Configuration conf, File edits)
- throws IOException {
+ int loadFSEdits( File edits ) throws IOException {
FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
FSDirectory fsDir = fsNamesys.dir;
int numEdits = 0;
@@ -267,15 +212,15 @@
logVersion = 0;
else
logVersion = in.readInt();
- if( logVersion < FSConstants.DFS_CURRENT_VERSION ) // future version
+ if( logVersion < FSConstants.LAYOUT_VERSION ) // future version
throw new IOException(
"Unexpected version of the file system log file: "
+ logVersion
+ ". Current version = "
- + FSConstants.DFS_CURRENT_VERSION + "." );
+ + FSConstants.LAYOUT_VERSION + "." );
}
- short replication = (short)conf.getInt("dfs.replication", 3);
+ short replication = fsNamesys.getDefaultReplication();
try {
while (true) {
byte opcode = -1;
@@ -304,7 +249,7 @@
name = (UTF8) writables[0];
replication = Short.parseShort(
((UTF8)writables[1]).toString());
- replication = adjustReplication( replication, conf );
+ replication = adjustReplication( replication );
}
// get blocks
aw = new ArrayWritable(Block.class);
@@ -321,9 +266,7 @@
UTF8 repl = new UTF8();
src.readFields(in);
repl.readFields(in);
- replication = adjustReplication(
- fromLogReplication(repl),
- conf);
+ replication = adjustReplication( fromLogReplication(repl) );
fsDir.unprotectedSetReplication(src.toString(),
replication,
null);
@@ -383,17 +326,18 @@
}
}
- if( logVersion != FSConstants.DFS_CURRENT_VERSION ) // other version
+ if( logVersion != FSConstants.LAYOUT_VERSION ) // other version
numEdits++; // save this image asap
return numEdits;
}
- static short adjustReplication( short replication, Configuration conf) {
- short minReplication = (short)conf.getInt("dfs.replication.min", 1);
+ static short adjustReplication( short replication) {
+ FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+ short minReplication = fsNamesys.getMinReplication();
if( replication<minReplication ) {
replication = minReplication;
}
- short maxReplication = (short)conf.getInt("dfs.replication.max", 512);
+ short maxReplication = fsNamesys.getMaxReplication();
if( replication>maxReplication ) {
replication = maxReplication;
}
@@ -404,18 +348,19 @@
* Write an operation to the edit log
*/
void logEdit(byte op, Writable w1, Writable w2) {
- for (int idx = 0; idx < editStreams.length; idx++) {
- synchronized (editStreams[idx]) {
+ assert this.getNumEditStreams() > 0 : "no editlog streams";
+ for (int idx = 0; idx < editStreams.size(); idx++) {
+ EditLogOutputStream eStream;
+ synchronized ( eStream = editStreams.get( idx ) ) {
try {
- editStreams[idx].write(op);
+ eStream.write(op);
if (w1 != null) {
- w1.write(editStreams[idx]);
+ w1.write( eStream );
}
if (w2 != null) {
- w2.write(editStreams[idx]);
+ w2.write( eStream );
}
- editStreams[idx].flush();
- editDescriptors[idx].sync();
+ eStream.flushAndSync();
} catch (IOException ie) {
try {
processIOError(idx);
@@ -500,12 +445,12 @@
* Return the size of the current EditLog
*/
long getEditLogSize() throws IOException {
- assert(editFiles.length == editStreams.length);
+ assert(getNumStorageDirs() == editStreams.size());
long size = 0;
- for (int idx = 0; idx < editFiles.length; idx++) {
- synchronized (editStreams[idx]) {
- assert(size == 0 || size == editFiles[idx].length());
- size = editFiles[idx].length();
+ for (int idx = 0; idx < getNumStorageDirs(); idx++) {
+ synchronized (editStreams.get( idx )) {
+ assert(size == 0 || size == getEditFile( idx ).length());
+ size = getEditFile( idx ).length();
}
}
return size;
@@ -527,12 +472,11 @@
//
// Open edits.new
//
- for (int idx = 0; idx < editFiles.length; idx++ ) {
+ for (int idx = 0; idx < getNumStorageDirs(); idx++ ) {
try {
- FileOutputStream stream = new FileOutputStream(editFilesNew[idx]);
- editStreams[idx] = new DataOutputStream(stream);
- editDescriptors[idx] = stream.getFD();
- editStreams[idx].writeInt( FSConstants.DFS_CURRENT_VERSION );
+ EditLogOutputStream eStream = new EditLogOutputStream( getEditNewFile( idx ));
+ eStream.create();
+ editStreams.add( eStream );
} catch (IOException e) {
processIOError(idx);
idx--;
@@ -541,41 +485,10 @@
}
/**
- * Closes the current edit log and opens edits.new.
- * If edits.new already exists, then ignore it.
- */
- void rollEditLogIfNeeded() throws IOException {
-
- //
- // Open edits.new
- //
- for (int idx = 0; idx < editFiles.length; idx++ ) {
- if (existsNew(idx)) {
- continue;
- }
- try {
- FileOutputStream stream = new FileOutputStream(editFilesNew[idx]);
- editStreams[idx] = new DataOutputStream(stream);
- editDescriptors[idx] = stream.getFD();
- editStreams[idx].writeInt( FSConstants.DFS_CURRENT_VERSION );
- } catch (IOException e) {
- processIOError(idx);
- idx--;
- }
- }
- }
- /**
* Removes the old edit log and renamed edits.new as edits.
* Reopens the edits file.
*/
void purgeEditLog() throws IOException {
- purgeEditLog(true);
- }
-
- /**
- * Removes the old edit log and renamed edits.new as edits.
- */
- void purgeEditLog(boolean reopenEdits) throws IOException {
//
// If edits.new does not exists, then return error.
//
@@ -588,14 +501,14 @@
//
// Delete edits and rename edits.new to edits.
//
- for (int idx = 0; idx < editFiles.length; idx++ ) {
- if (!editFilesNew[idx].renameTo(editFiles[idx])) {
+ for (int idx = 0; idx < getNumStorageDirs(); idx++ ) {
+ if (!getEditNewFile( idx ).renameTo(getEditFile( idx ))) {
//
// renameTo() fails on Windows if the destination
// file exists.
//
- editFiles[idx].delete();
- if (!editFilesNew[idx].renameTo(editFiles[idx])) {
+ getEditFile( idx ).delete();
+ if (!getEditNewFile( idx ).renameTo(getEditFile( idx ))) {
processIOError(idx);
idx--;
}
@@ -604,24 +517,13 @@
//
// Reopen all the edits logs.
//
- boolean append = true;
- for (int idx = 0; reopenEdits && idx < editStreams.length; idx++) {
- try {
- FileOutputStream stream = new FileOutputStream(editFiles[idx],
- append);
- editStreams[idx] = new DataOutputStream(stream);
- editDescriptors[idx] = stream.getFD();
- } catch (IOException e) {
- processIOError(idx);
- idx--;
- }
- }
+ open();
}
/**
* Return the name of the edit file
*/
File getFsEditName() throws IOException {
- return editFiles[0];
+ return getEditFile( 0 );
}
}