You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/06 00:17:17 UTC
svn commit: r440508 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/util/
src/test/org/apache/hadoop/dfs/
Author: cutting
Date: Tue Sep 5 15:17:14 2006
New Revision: 440508
URL: http://svn.apache.org/viewvc?view=rev&rev=440508
Log:
Manage multiple volumes with a single DataNode. Contributed by Milind.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Sep 5 15:17:14 2006
@@ -114,6 +114,12 @@
replacing them with Text for better performance.
(Hairong Kuang via cutting)
+29. HADOOP-64. Manage multiple volumes with a single DataNode.
+ Previously DataNode would create a separate daemon per configured
+ volume, each with its own connection to the NameNode. Now all
+ volumes are handled by a single DataNode daemon, reducing the load
+ on the NameNode. (Milind Bhandarkar via cutting)
+
Release 0.5.0 - 2006-08-04
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Tue Sep 5 15:17:14 2006
@@ -140,6 +140,14 @@
</property>
<property>
+ <name>dfs.client.buffer.dir</name>
+ <value>${hadoop.tmp.dir}/dfs/tmp</value>
+ <description>Determines where on the local filesystem an DFS client
+ should store its blocks before it sends them to the datanode.
+ </description>
+</property>
+
+<property>
<name>dfs.data.dir</name>
<value>${hadoop.tmp.dir}/dfs/data</value>
<description>Determines where on the local filesystem an DFS data node
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Sep 5 15:17:14 2006
@@ -776,7 +776,7 @@
}
private File newBackupFile() throws IOException {
- File result = conf.getFile("dfs.data.dir",
+ File result = conf.getFile("dfs.client.buffer.dir",
"tmp"+File.separator+
"client-"+Math.abs(r.nextLong()));
result.deleteOnExit();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Sep 5 15:17:14 2006
@@ -86,7 +86,6 @@
return new InetSocketAddress(host, port);
}
- private static Map subDataNodeList = null;
DatanodeProtocol namenode;
FSDataset data;
DatanodeRegistration dnRegistration;
@@ -148,12 +147,12 @@
DataNodeMetrics myMetrics = new DataNodeMetrics();
/**
- * Create the DataNode given a configuration and a dataDir.
- * 'dataDir' is where the blocks are stored.
+ * Create the DataNode given a configuration and an array of dataDirs.
+ * 'dataDirs' is where the blocks are stored.
*/
- public DataNode(Configuration conf, String datadir) throws IOException {
+ DataNode(Configuration conf, String[] dataDirs) throws IOException {
this(InetAddress.getLocalHost().getHostName(),
- new File(datadir),
+ dataDirs,
createSocketAddr(conf.get("fs.default.name", "local")), conf);
int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
this.infoServer = new StatusHttpServer("datanode", infoServerPort, true);
@@ -176,11 +175,22 @@
* @see DataStorage
*/
private DataNode(String machineName,
- File datadir,
+ String[] dataDirs,
InetSocketAddress nameNodeAddr,
Configuration conf ) throws IOException {
- // get storage info and lock the data dir
- storage = new DataStorage( datadir );
+ File[] volumes = new File[dataDirs.length];
+ for (int idx = 0; idx < dataDirs.length; idx++) {
+ volumes[idx] = new File(dataDirs[idx]);
+ }
+ // get storage info and lock the data dirs
+ storage = new DataStorage( volumes );
+ int numDirs = storage.getNumLocked();
+ if (numDirs == 0) { // all data dirs are in use
+ throw new IOException("Cannot start multiple Datanode instances "
+ + "sharing the same data directories.\n"
+ + StringUtils.arrayToString(dataDirs) + " are locked. ");
+ }
+ volumes = storage.getLockedDirs();
// connect to name node
this.namenode = (DatanodeProtocol)
RPC.waitForProxy(DatanodeProtocol.class,
@@ -207,7 +217,7 @@
-1,
"" );
// initialize data node internal structure
- this.data = new FSDataset(datadir, conf);
+ this.data = new FSDataset(volumes, conf);
this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
long blockReportIntervalBasis =
@@ -251,7 +261,7 @@
dnRegistration = namenode.register( dnRegistration );
if( storage.getStorageID().equals("") ) {
storage.setStorageID( dnRegistration.getStorageID());
- storage.write();
+ storage.writeAll();
}
}
@@ -267,24 +277,11 @@
this.shouldRun = false;
((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
try {
- this.storage.close();
+ this.storage.closeAll();
} catch (IOException ie) {
}
}
- /**
- * Shut down all datanodes that where started via the run(conf) method.
- * Returns only after shutdown is complete.
- */
- public static void shutdownAll(){
- if(subDataNodeList != null && !subDataNodeList.isEmpty()){
- for (Iterator iterator = subDataNodeList.keySet().iterator(); iterator.hasNext();) {
- DataNode dataNode = (DataNode) iterator.next();
- dataNode.shutdown();
- }
- }
- }
-
void handleDiskError( String errMsgr ) {
LOG.warn( "DataNode is shutting down.\n" + errMsgr );
try {
@@ -940,7 +937,7 @@
* Only stop when "shouldRun" is turned off (which can only happen at shutdown).
*/
public void run() {
- LOG.info("Starting DataNode in: "+data.data);
+ LOG.info("Starting DataNode in: "+data);
// start dataXceiveServer
dataXceiveServer.start();
@@ -966,40 +963,50 @@
} catch (InterruptedException ie) {
}
- LOG.info("Finishing DataNode in: "+data.data);
+ LOG.info("Finishing DataNode in: "+data);
}
- /** Start datanode daemons.
- * Start a datanode daemon for each comma separated data directory
- * specified in property dfs.data.dir
+ private static ArrayList dataNodeList = new ArrayList();
+ private static ArrayList dataNodeThreadList = new ArrayList();
+
+ /** Start datanode daemon.
*/
public static void run(Configuration conf) throws IOException {
String[] dataDirs = conf.getStrings("dfs.data.dir");
- subDataNodeList = new HashMap(dataDirs.length);
- for (int i = 0; i < dataDirs.length; i++) {
- DataNode dn = makeInstanceForDir(dataDirs[i], conf);
- if (dn != null) {
- Thread t = new Thread(dn, "DataNode: "+dataDirs[i]);
- t.setDaemon(true); // needed for JUnit testing
- t.start();
- subDataNodeList.put(dn,t);
- }
+ DataNode dn = makeInstance(dataDirs, conf);
+ dataNodeList.add(dn);
+ if (dn != null) {
+ Thread t = new Thread(dn, "DataNode: [" +
+ StringUtils.arrayToString(dataDirs) + "]");
+ t.setDaemon(true); // needed for JUnit testing
+ t.start();
+ dataNodeThreadList.add(t);
}
}
+
+ /**
+ * Shut down all datanodes that where started via the run(conf) method.
+ * Returns only after shutdown is complete.
+ */
+ public static void shutdownAll(){
+ if(!dataNodeList.isEmpty()){
+ for (Iterator iterator = dataNodeList.iterator(); iterator.hasNext();) {
+ DataNode dataNode = (DataNode) iterator.next();
+ dataNode.shutdown();
+ }
+ }
+ }
+
- /** Start datanode daemons.
- * Start a datanode daemon for each comma separated data directory
- * specified in property dfs.data.dir and wait for them to finish.
- * If this thread is specifically interrupted, it will stop waiting.
+ /** Start a single datanode daemon and wait for it to finish.
+ * If this thread is specifically interrupted, it will stop waiting.
*/
private static void runAndWait(Configuration conf) throws IOException {
run(conf);
-
- // Wait for sub threads to exit
- for (Iterator iterator = subDataNodeList.entrySet().iterator(); iterator.hasNext();) {
- Thread threadDataNode = (Thread) ((Map.Entry) iterator.next()).getValue();
+ if (dataNodeThreadList.size() > 0) {
+ Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
try {
- threadDataNode.join();
+ t.join();
} catch (InterruptedException e) {
if (Thread.currentThread().isInterrupted()) {
// did someone knock?
@@ -1010,25 +1017,29 @@
}
/**
- * Make an instance of DataNode after ensuring that given data directory
- * (and parent directories, if necessary) can be created.
- * @param dataDir where the new DataNode instance should keep its files.
+ * Make an instance of DataNode after ensuring that at least one of the
+ * given data directories (and their parent directories, if necessary)
+ * can be created.
+ * @param dataDirs List of directories, where the new DataNode instance should
+ * keep its files.
* @param conf Configuration instance to use.
- * @return DataNode instance for given data dir and conf, or null if directory
- * cannot be created.
+ * @return DataNode instance for given list of data dirs and conf, or null if
+ * no directory from this directory list can be created.
* @throws IOException
*/
- static DataNode makeInstanceForDir(String dataDir, Configuration conf) throws IOException {
- DataNode dn = null;
- File data = new File(dataDir);
- try {
+ static DataNode makeInstance(String[] dataDirs, Configuration conf)
+ throws IOException {
+ ArrayList dirs = new ArrayList();
+ for (int i = 0; i < dataDirs.length; i++) {
+ File data = new File(dataDirs[i]);
+ try {
DiskChecker.checkDir( data );
- dn = new DataNode(conf, dataDir);
- return dn;
- } catch( DiskErrorException e ) {
- LOG.warn("Can't start DataNode because " + e.getMessage() );
- return null;
+ dirs.add(dataDirs[i]);
+ } catch( DiskErrorException e ) {
+ LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() );
+ }
}
+ return ((dirs.size() > 0) ? new DataNode(conf, dataDirs) : null);
}
public String toString() {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java Tue Sep 5 15:17:14 2006
@@ -4,6 +4,10 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.UTF8;
@@ -11,33 +15,39 @@
* Data storage information file.
* <p>
* During startup the datanode reads its data storage file.
- * The data storage file is stored in the dfs.data.dir directory.
+ * The data storage file is stored in all the dfs.data.dir directories.
* It contains version and storageID.
- * Datanode holds a lock on the dataStorage file while it runs so that other
+ * Datanode holds a lock on all the dataStorage files while it runs so that other
* datanodes were not able to start working with the same data storage.
- * The lock is released when the datanode stops (normally or abnormally).
+ * The locks are released when the datanode stops (normally or abnormally).
*
* @author Konstantin Shvachko
*/
class DataStorage {
public static final String STORAGE_INFO_FILE_NAME = "storage";
+ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataStorage");
// persistent fields
private int version = 0; /// stored version
private String storageID; /// unique per cluster storageID
// non persistent fields
- private RandomAccessFile storageFile = null;
- private FileLock storageLock = null;
+ private ArrayList storageFiles = new ArrayList();
+ private ArrayList storageLocks = new ArrayList();
+
+ // cache away the names of locked dirs
+ private File[] dirs = null;
+
+ private int numLocked = 0;
/**
* Create DataStorage and verify its version.
*
- * @param datadir data storage directory
+ * @param dataDirs array of data storage directories
* @throws IOException
*/
- public DataStorage( File datadir ) throws IOException {
- this( DataNode.DFS_CURRENT_VERSION, datadir );
+ public DataStorage( File[] dataDirs ) throws IOException {
+ this( DataNode.DFS_CURRENT_VERSION, dataDirs );
if( version < FSConstants.DFS_CURRENT_VERSION ) // future version
throw new IncorrectVersionException( version, "data storage" );
@@ -46,29 +56,43 @@
/**
* Create DataStorage.
*
- * Read data storage file if exists or create it if not.
- * Lock the file.
+ * Read data storage files if they exist or create them if not.
+ * Lock the files.
*
* @param curVersion can be used to read file saved with a previous version.
- * @param datadir data storage directory
+ * @param dataDirs Array of data storage directories
* @throws IOException
*/
- public DataStorage( int curVersion, File datadir ) throws IOException {
+ public DataStorage( int curVersion, File[] dataDirs ) throws IOException {
this.version = curVersion;
- storageFile = new RandomAccessFile(
- new File(datadir, STORAGE_INFO_FILE_NAME ),
- "rws" );
- lock();
- boolean needToSave;
- try {
- needToSave = read();
- } catch( java.io.EOFException e ) {
- storageID = "";
- needToSave = true;
- }
+ for (int idx = 0; idx < dataDirs.length; idx++) {
+ storageFiles.add(idx, new RandomAccessFile(
+ new File(dataDirs[idx], STORAGE_INFO_FILE_NAME ),
+ "rws" ));
+ lock(idx);
+ boolean needToSave;
+ try {
+ needToSave = read(idx);
+ } catch( java.io.EOFException e ) {
+ storageID = "";
+ needToSave = true;
+ }
- if( needToSave )
- write();
+ if( needToSave ) { write(idx); }
+
+ RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+ if (file != null) { numLocked++; }
+ }
+ if (numLocked > 0) {
+ this.dirs = new File[numLocked];
+ int curidx = 0;
+ for (int idx = 0; idx < dataDirs.length; idx++) {
+ if (storageFiles.get(idx) != null) {
+ dirs[curidx] = dataDirs[idx];
+ curidx++;
+ }
+ }
+ }
}
public int getVersion() {
@@ -79,6 +103,14 @@
return storageID;
}
+ public int getNumLocked() {
+ return numLocked;
+ }
+
+ public File[] getLockedDirs() {
+ return dirs;
+ }
+
public void setStorageID( String newStorageID ) {
this.storageID = newStorageID;
}
@@ -88,58 +120,93 @@
}
/**
- * Lock datastoarge file.
+ * Lock datastorage file.
*
* @throws IOException
*/
- public void lock() throws IOException {
- storageLock = storageFile.getChannel().tryLock();
- if( storageLock == null )
- throw new IOException( "Cannot start multiple Datanode instances "
- + "sharing the same data directory.\n"
- + STORAGE_INFO_FILE_NAME + " is locked. ");
+ private void lock(int idx) throws IOException {
+ RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+ FileLock lock = file.getChannel().tryLock();
+ if (lock == null) {
+ // log a warning
+ LOG.warn("Cannot lock storage file in directory "+dirs[idx].getName());
+ // remove the file from fileList, and close it
+ storageFiles.add(idx, null);
+ file.close();
+ }
+ storageLocks.add(idx, lock);
}
/**
- * Unlock datastoarge file.
+ * Unlock datastorage file.
+ * @param idx File index
*
* @throws IOException
*/
- public void unlock() throws IOException {
- storageLock.release();
+ private void unlock(int idx) throws IOException {
+ FileLock lock = (FileLock) storageLocks.get(idx);
+ if (lock != null) { lock.release(); }
}
/**
- * Close datastoarge file.
- *
+ * Close a datastorage file.
+ * @param idx file index
* @throws IOException
*/
- public void close() throws IOException {
- storageLock.release();
- storageFile.close();
+ private void close(int idx) throws IOException {
+ FileLock lock = (FileLock) storageLocks.get(idx);
+ if (lock == null) { return; }
+ lock.release();
+ RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+ file.close();
+ }
+
+ /**
+ * Close all datastorage files.
+ * @throws IOException
+ */
+ public void closeAll() throws IOException {
+ for (int idx = 0; idx < dirs.length; idx++) {
+ close(idx);
+ }
}
/**
* Read data storage file.
- *
+ * @param idx File index
* @return whether the data storage file need to be updated.
* @throws IOException
*/
- public boolean read() throws IOException {
- storageFile.seek(0);
- this.version = storageFile.readInt();
- this.storageID = UTF8.readString( storageFile );
+ private boolean read(int idx) throws IOException {
+ RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+ if (file == null) { return false; }
+ file.seek(0);
+ this.version = file.readInt();
+ this.storageID = UTF8.readString( file );
return false;
}
/**
* Write data storage file.
- *
+ * @param idx File index
* @throws IOException
*/
- public void write() throws IOException {
- storageFile.seek(0);
- storageFile.writeInt( this.version );
- UTF8.writeString( storageFile, this.storageID );
+ private void write(int idx) throws IOException {
+ RandomAccessFile file = (RandomAccessFile) storageFiles.get(idx);
+ if (file == null) { return; }
+ file.seek(0);
+ file.writeInt( this.version );
+ UTF8.writeString( file, this.storageID );
}
+
+ /**
+ * Write all data storage files.
+ * @throws IOException
+ */
+ public void writeAll() throws IOException {
+ for (int idx = 0; idx < dirs.length; idx++) {
+ write(idx);
+ }
+ }
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Tue Sep 5 15:17:14 2006
@@ -32,7 +32,6 @@
***************************************************/
class FSDataset implements FSConstants {
- static final double USABLE_DISK_PCT_DEFAULT = 0.98f;
/**
* A node type that can be built into a tree reflecting the
@@ -40,92 +39,72 @@
*/
class FSDir {
File dir;
+ int numBlocks = 0;
+ int myIdx = 0;
FSDir children[];
+ FSDir siblings[];
/**
*/
- public FSDir(File dir) {
+ public FSDir(File dir, int myIdx, FSDir[] siblings) {
this.dir = dir;
+ this.myIdx = myIdx;
+ this.siblings = siblings;
this.children = null;
+ if (! dir.exists()) {
+ dir.mkdirs();
+ } else {
+ File[] files = dir.listFiles();
+ int numChildren = 0;
+ for (int idx = 0; idx < files.length; idx++) {
+ if (files[idx].isDirectory()) {
+ numChildren++;
+ } else if (Block.isBlockFilename(files[idx])) {
+ numBlocks++;
+ }
+ }
+ if (numChildren > 0) {
+ children = new FSDir[numChildren];
+ int curdir = 0;
+ for (int idx = 0; idx < files.length; idx++) {
+ if (files[idx].isDirectory()) {
+ children[curdir] = new FSDir(files[idx], curdir, children);
+ curdir++;
+ }
+ }
+ }
+ }
}
/**
*/
- public File getDirName() {
- return dir;
- }
-
- /**
- */
- public FSDir[] getChildren() {
- return children;
- }
-
- /**
- */
- public void addBlock(Block b, File src) {
- addBlock(b, src, b.getBlockId(), 0);
- }
-
- /**
- */
- void addBlock(Block b, File src, long blkid, int depth) {
- //
- // Add to the local dir, if no child dirs
- //
- if (children == null) {
- src.renameTo(new File(dir, b.getBlockName()));
-
- //
- // Test whether this dir's contents should be busted
- // up into subdirs.
- //
-
- // REMIND - mjc - sometime soon, we'll want this code
- // working. It prevents the datablocks from all going
- // into a single huge directory.
- /**
- File localFiles[] = dir.listFiles();
- if (localFiles.length == 16) {
- //
- // Create all the necessary subdirs
- //
- this.children = new FSDir[16];
- for (int i = 0; i < children.length; i++) {
- String str = Integer.toBinaryString(i);
- try {
- File subdir = new File(dir, "dir_" + str);
- subdir.mkdir();
- children[i] = new FSDir(subdir);
- } catch (StringIndexOutOfBoundsException excep) {
- excep.printStackTrace();
- System.out.println("Ran into problem when i == " + i + " an str = " + str);
- }
- }
-
- //
- // Move existing files into new dirs
- //
- for (int i = 0; i < localFiles.length; i++) {
- Block srcB = new Block(localFiles[i]);
- File dst = getBlockFilename(srcB, blkid, depth);
- if (!src.renameTo(dst)) {
- System.out.println("Unexpected problem in renaming " + src);
- }
- }
- }
- **/
+ public File addBlock(Block b, File src) {
+ if (numBlocks < maxBlocksPerDir) {
+ File dest = new File(dir, b.getBlockName());
+ src.renameTo(dest);
+ numBlocks += 1;
+ return dest;
} else {
- // Find subdir
- children[getHalfByte(blkid, depth)].addBlock(b, src, blkid, depth+1);
+ if (siblings != null && myIdx != (siblings.length-1)) {
+ File dest = siblings[myIdx+1].addBlock(b, src);
+ if (dest != null) { return dest; }
+ }
+ if (children == null) {
+ children = new FSDir[maxBlocksPerDir];
+ for (int idx = 0; idx < maxBlocksPerDir; idx++) {
+ children[idx] = new FSDir(
+ new File(dir, "subdir"+idx), idx, children);
+ }
+ }
+ return children[0].addBlock(b, src);
}
}
/**
- * Fill in the given blockSet with any child blocks
+ * Populate the given blockSet with any child blocks
* found at this node.
*/
- public void getBlockInfo(TreeSet blockSet) {
+ public void getBlockInfo(TreeSet<Block> blockSet) {
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].getBlockInfo(blockSet);
@@ -140,38 +119,36 @@
}
}
- /**
- * Find the file that corresponds to the given Block
- */
- public File getBlockFilename(Block b) {
- return getBlockFilename(b, b.getBlockId(), 0);
- }
- /**
- * Helper method to find file for a Block
- */
- private File getBlockFilename(Block b, long blkid, int depth) {
- if (children == null) {
- return new File(dir, b.getBlockName());
- } else {
- //
- // Lift the 4 bits starting at depth, going left->right.
- // That means there are 2^4 possible children, or 16.
- // The max depth is thus ((len(long) / 4) == 16).
- //
- return children[getHalfByte(blkid, depth)].getBlockFilename(b, blkid, depth+1);
+ void getVolumeMap(HashMap<Block, FSVolume> volumeMap, FSVolume volume) {
+ if (children != null) {
+ for (int i = 0; i < children.length; i++) {
+ children[i].getVolumeMap(volumeMap, volume);
+ }
}
- }
- /**
- * Returns a number 0-15, inclusive. Pulls out the right
- * half-byte from the indicated long.
- */
- private int getHalfByte(long blkid, int halfByteIndex) {
- blkid = blkid >> ((15 - halfByteIndex) * 4);
- return (int) ((0x000000000000000F) & blkid);
+ File blockFiles[] = dir.listFiles();
+ for (int i = 0; i < blockFiles.length; i++) {
+ if (Block.isBlockFilename(blockFiles[i])) {
+ volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), volume);
+ }
+ }
}
+ void getBlockMap(HashMap<Block, File> blockMap) {
+ if (children != null) {
+ for (int i = 0; i < children.length; i++) {
+ children[i].getBlockMap(blockMap);
+ }
+ }
+
+ File blockFiles[] = dir.listFiles();
+ for (int i = 0; i < blockFiles.length; i++) {
+ if (Block.isBlockFilename(blockFiles[i])) {
+ blockMap.put(new Block(blockFiles[i], blockFiles[i].length()), blockFiles[i]);
+ }
+ }
+ }
/**
* check if a data diretory is healthy
* @throws DiskErrorException
@@ -195,50 +172,210 @@
}
}
+ class FSVolume {
+ static final double USABLE_DISK_PCT_DEFAULT = 0.98f;
+
+ private File dir;
+ private FSDir dataDir;
+ private File tmpDir;
+ private DF usage;
+ private long reserved;
+ private double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
+
+ FSVolume(File dir, Configuration conf) throws IOException {
+ this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+ this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
+ (float) USABLE_DISK_PCT_DEFAULT);
+ this.dir = dir;
+ this.dataDir = new FSDir(new File(dir, "data"), 0, null);
+ this.tmpDir = new File(dir, "tmp");
+ if (tmpDir.exists()) {
+ FileUtil.fullyDelete(tmpDir);
+ }
+ tmpDir.mkdirs();
+ this.usage = new DF(dir, conf);
+ }
+
+ long getCapacity() throws IOException {
+ return usage.getCapacity();
+ }
+
+ long getAvailable() throws IOException {
+ return ((long) Math.round(usableDiskPct *
+ usage.getAvailable()) - reserved);
+ }
+
+ String getMount() throws IOException {
+ return usage.getMount();
+ }
+
+ File createTmpFile(Block b) throws IOException {
+ File f = new File(tmpDir, b.getBlockName());
+ try {
+ if (f.exists()) {
+ throw new IOException("Unexpected problem in creating temporary file for "+
+ b + ". File " + f + " should not be present, but is.");
+ }
+ // Create the zero-length temp file
+ //
+ if (!f.createNewFile()) {
+ throw new IOException("Unexpected problem in creating temporary file for "+
+ b + ". File " + f + " should be creatable, but is already present.");
+ }
+ } catch (IOException ie) {
+ System.out.println("Exception! " + ie);
+ throw ie;
+ }
+ reserved -= b.getNumBytes();
+ return f;
+ }
+
+ File addBlock(Block b, File f) {
+ return dataDir.addBlock(b, f);
+ }
+
+ void checkDirs() throws DiskErrorException {
+ dataDir.checkDirTree();
+ DiskChecker.checkDir(tmpDir);
+ }
+
+ void getBlockInfo(TreeSet<Block> blockSet) {
+ dataDir.getBlockInfo(blockSet);
+ }
+
+ void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+ dataDir.getVolumeMap(volumeMap, this);
+ }
+
+ void getBlockMap(HashMap<Block, File> blockMap) {
+ dataDir.getBlockMap(blockMap);
+ }
+
+ public String toString() {
+ return dir.getAbsolutePath();
+ }
+ }
+
+ class FSVolumeSet {
+ FSVolume[] volumes = null;
+ int curVolume = 0;
+ HashMap<String,Long> mountMap = new HashMap<String,Long>();
+
+ FSVolumeSet(FSVolume[] volumes) {
+ this.volumes = volumes;
+ }
+
+ FSVolume getNextVolume(long blockSize) throws IOException {
+ int startVolume = curVolume;
+ while (true) {
+ FSVolume volume = volumes[curVolume];
+ curVolume = (curVolume + 1) % volumes.length;
+ if (volume.getAvailable() >= blockSize) { return volume; }
+ if (curVolume == startVolume) {
+ throw new DiskOutOfSpaceException("Insufficient space for an additional block");
+ }
+ }
+ }
+
+ synchronized long getCapacity() throws IOException {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ String mount = volumes[idx].getMount();
+ Long capacity = new Long(volumes[idx].getCapacity());
+ mountMap.put(mount, capacity);
+ }
+ long capacity = 0L;
+ for (Iterator<Long> iter = mountMap.values().iterator(); iter.hasNext();) {
+ capacity += iter.next().longValue();
+ }
+ return capacity;
+ }
+
+ synchronized long getRemaining() throws IOException {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ String mount = volumes[idx].getMount();
+ Long remaining = new Long(volumes[idx].getCapacity());
+ mountMap.put(mount, remaining);
+ }
+ long remaining = 0L;
+ for (Iterator<Long> iter = mountMap.values().iterator(); iter.hasNext();) {
+ remaining += iter.next().longValue();
+ }
+ return remaining;
+ }
+
+ void getBlockInfo(TreeSet<Block> blockSet) {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ volumes[idx].getBlockInfo(blockSet);
+ }
+ }
+
+ void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ volumes[idx].getVolumeMap(volumeMap);
+ }
+ }
+
+ void getBlockMap(HashMap<Block, File> blockMap) {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ volumes[idx].getBlockMap(blockMap);
+ }
+ }
+
+ void checkDirs() throws DiskErrorException {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ volumes[idx].checkDirs();
+ }
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ for (int idx = 0; idx < volumes.length; idx++) {
+ sb.append(volumes[idx].toString());
+ if (idx != volumes.length - 1) { sb.append(","); }
+ }
+ return sb.toString();
+ }
+ }
//////////////////////////////////////////////////////
//
// FSDataSet
//
//////////////////////////////////////////////////////
- DF diskUsage;
- File data = null, tmp = null;
- long reserved = 0;
- double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
- FSDir dirTree;
- TreeSet ongoingCreates = new TreeSet();
+ FSVolumeSet volumes;
+ private HashMap<Block,File> ongoingCreates = new HashMap<Block,File>();
+ private int maxBlocksPerDir = 0;
+ private HashMap<Block,FSVolume> volumeMap = null;
+ private HashMap<Block,File> blockMap = null;
/**
* An FSDataset has a directory where it loads its data files.
*/
- public FSDataset(File dir, Configuration conf) throws IOException {
- this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
- this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct", (float) USABLE_DISK_PCT_DEFAULT);
- diskUsage = new DF( dir, conf);
- this.data = new File(dir, "data");
- if (! data.exists()) {
- data.mkdirs();
- }
- this.tmp = new File(dir, "tmp");
- if (tmp.exists()) {
- FileUtil.fullyDelete(tmp);
- }
- this.tmp.mkdirs();
- this.dirTree = new FSDir(data);
+ public FSDataset(File[] dirs, Configuration conf) throws IOException {
+ this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
+ FSVolume[] volArray = new FSVolume[dirs.length];
+ for (int idx = 0; idx < dirs.length; idx++) {
+ volArray[idx] = new FSVolume(dirs[idx], conf);
+ }
+ volumes = new FSVolumeSet(volArray);
+ volumeMap = new HashMap<Block,FSVolume>();
+ volumes.getVolumeMap(volumeMap);
+ blockMap = new HashMap<Block,File>();
+ volumes.getBlockMap(blockMap);
}
/**
* Return total capacity, used and unused
*/
public long getCapacity() throws IOException {
- return diskUsage.getCapacity();
+ return volumes.getCapacity();
}
/**
* Return how many bytes can still be stored in the FSDataset
*/
public long getRemaining() throws IOException {
- return ((long) Math.round(usableDiskPct * diskUsage.getAvailable())) - reserved;
+ return volumes.getRemaining();
}
/**
@@ -263,19 +400,6 @@
}
/**
- * A Block b will be coming soon!
- */
- public boolean startBlock(Block b) throws IOException {
- //
- // Make sure the block isn't 'valid'
- //
- if (isValidBlock(b)) {
- throw new IOException("Block " + b + " is valid, and cannot be created.");
- }
- return true;
- }
-
- /**
* Start writing to a block file
*/
public OutputStream writeToBlock(Block b) throws IOException {
@@ -295,41 +419,17 @@
//
// Is it already in the create process?
//
- if (ongoingCreates.contains(b)) {
- throw new IOException("Block " + b + " has already been started (though not completed), and thus cannot be created.");
+ if (ongoingCreates.containsKey(b)) {
+ throw new IOException("Block " + b +
+ " has already been started (though not completed), and thus cannot be created.");
}
- //
- // Check if we have too little space
- //
- if (getRemaining() < blockSize) {
- throw new DiskOutOfSpaceException("Insufficient space for an additional block");
- }
-
- //
- // OK, all's well. Register the create, adjust
- // 'reserved' size, & create file
- //
- ongoingCreates.add(b);
- reserved += blockSize;
- f = getTmpFile(b);
- try {
- if (f.exists()) {
- throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should not be present, but is.");
- }
-
- //
- // Create the zero-length temp file
- //
- if (!f.createNewFile()) {
- throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should be creatable, but is already present.");
- }
- } catch (IOException ie) {
- System.out.println("Exception! " + ie);
- ongoingCreates.remove(b);
- reserved -= blockSize;
- throw ie;
- }
+ FSVolume v = volumes.getNextVolume(blockSize);
+
+ // create temporary file to hold block in the designated volume
+ f = v.createTmpFile(b);
+ ongoingCreates.put(b, f);
+ volumeMap.put(b, v);
}
//
@@ -352,49 +452,31 @@
* Complete the block write!
*/
public void finalizeBlock(Block b) throws IOException {
- File f = getTmpFile(b);
- if (! f.exists()) {
- throw new IOException("No temporary file " + f + " for block " + b);
- }
+ synchronized (ongoingCreates) {
+ File f = ongoingCreates.get(b);
+ if (f == null || ! f.exists()) {
+ throw new IOException("No temporary file " + f + " for block " + b);
+ }
+ long finalLen = f.length();
+ b.setNumBytes(finalLen);
+ FSVolume v = volumeMap.get(b);
- synchronized (ongoingCreates) {
- //
- // Make sure still registered as ongoing
- //
- if (! ongoingCreates.contains(b)) {
- throw new IOException("Tried to finalize block " + b + ", but not in ongoingCreates table");
- }
-
- long finalLen = f.length();
- b.setNumBytes(finalLen);
-
- //
- // Move the file
- // (REMIND - mjc - shame to move the file within a synch
- // section! Maybe remove this?)
- //
- dirTree.addBlock(b, f);
-
- //
- // Done, so deregister from ongoingCreates
- //
- if (! ongoingCreates.remove(b)) {
- throw new IOException("Tried to finalize block " + b + ", but could not find it in ongoingCreates after file-move!");
- }
- reserved -= b.getNumBytes();
- }
+ File dest = v.addBlock(b, f);
+ blockMap.put(b, dest);
+ ongoingCreates.remove(b);
+ }
}
/**
* Return a table of block data
*/
public Block[] getBlockReport() {
- TreeSet blockSet = new TreeSet();
- dirTree.getBlockInfo(blockSet);
+ TreeSet<Block> blockSet = new TreeSet<Block>();
+ volumes.getBlockInfo(blockSet);
Block blockTable[] = new Block[blockSet.size()];
int i = 0;
- for (Iterator it = blockSet.iterator(); it.hasNext(); i++) {
- blockTable[i] = (Block) it.next();
+ for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
+ blockTable[i] = it.next();
}
return blockTable;
}
@@ -404,11 +486,7 @@
*/
public boolean isValidBlock(Block b) {
File f = getFile(b);
- if (f.exists()) {
- return true;
- } else {
- return false;
- }
+ return (f!= null && f.exists());
}
/**
@@ -417,31 +495,22 @@
* just get rid of it.
*/
public void invalidate(Block invalidBlks[]) throws IOException {
- for (int i = 0; i < invalidBlks.length; i++) {
- File f = getFile(invalidBlks[i]);
-
- // long len = f.length();
- if (!f.delete()) {
- throw new IOException("Unexpected error trying to delete block " + invalidBlks[i] + " at file " + f);
- }
- DataNode.LOG.info("Deleting block " + invalidBlks[i]);
- }
+ for (int i = 0; i < invalidBlks.length; i++) {
+ File f = getFile(invalidBlks[i]);
+ if (!f.delete()) {
+ throw new IOException("Unexpected error trying to delete block "
+ + invalidBlks[i] + " at file " + f);
+ }
+ blockMap.remove(invalidBlks[i]);
+ DataNode.LOG.info("Deleting block " + invalidBlks[i]);
+ }
}
/**
* Turn the block identifier into a filename.
*/
File getFile(Block b) {
- // REMIND - mjc - should cache this result for performance
- return dirTree.getBlockFilename(b);
- }
-
- /**
- * Get the temp file, if this block is still being created.
- */
- File getTmpFile(Block b) {
- // REMIND - mjc - should cache this result for performance
- return new File(tmp, b.getBlockName());
+ return blockMap.get(b);
}
/**
@@ -450,15 +519,12 @@
* @author hairong
*/
void checkDataDir() throws DiskErrorException {
- dirTree.checkDirTree();
- DiskChecker.checkDir( tmp );
+ volumes.checkDirs();
}
public String toString() {
- return "FSDataset{" +
- "dirpath='" + diskUsage.getDirPath() + "'" +
- "}";
+ return "FSDataset{dirpath='"+volumes+"'}";
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Tue Sep 5 15:17:14 2006
@@ -79,4 +79,22 @@
}
return numFormat.format(result) + suffix;
}
+
+ /**
+ * Given an array of strings, return a comma-separated list of its elements.
+ * @param strs Array of strings
+ * @return Empty string if strs.length is 0, comma separated list of strings
+ * otherwise
+ */
+
+ public static String arrayToString(String[] strs) {
+ if (strs.length == 0) { return ""; }
+ StringBuffer sbuf = new StringBuffer();
+ sbuf.append(strs[0]);
+ for (int idx = 1; idx < strs.length; idx++) {
+ sbuf.append(",");
+ sbuf.append(strs[idx]);
+ }
+ return sbuf.toString();
+ }
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFS.java Tue Sep 5 15:17:14 2006
@@ -227,12 +227,13 @@
conf.set("fs.default.name", nameNodeSocketAddr);
for (int i = 0; i < initialDNcount; i++) {
// uniquely config real fs path for data storage for this datanode
- String dataDir = baseDirSpecified + "/datanode" + i;
- conf.set("dfs.data.dir", dataDir);
- DataNode dn = DataNode.makeInstanceForDir(dataDir, conf);
+ String dataDirs[] = new String[1];
+ dataDirs[0] = baseDirSpecified + "/datanode" + i;
+ conf.set("dfs.data.dir", dataDirs[0]);
+ DataNode dn = DataNode.makeInstance(dataDirs, conf);
if (dn != null) {
listOfDataNodeDaemons.add(dn);
- (new Thread(dn, "DataNode" + i + ": " + dataDir)).start();
+ (new Thread(dn, "DataNode" + i + ": " + dataDirs[0])).start();
}
}
try {
@@ -365,8 +366,7 @@
if (i != iDatanodeClosed) {
try {
if (checkDataDirsEmpty) {
- File dataDir = new File(dataNode.data.diskUsage.getDirPath());
- assertNoBlocks(dataDir);
+ assertNoBlocks(dataNode);
}
dataNode.shutdown();
@@ -408,18 +408,13 @@
msg(summarizeThreadGroup());
}
- private void assertNoBlocks(File datanodeDir) {
- File datanodeDataDir = new File(datanodeDir, "data");
- String[] blockFilenames =
- datanodeDataDir.list(
- new FilenameFilter() {
- public boolean accept(File dir, String name){
- return Block.isBlockFilename(new File(dir, name));}});
+ private void assertNoBlocks(DataNode dn) {
+ Block[] blocks = dn.data.getBlockReport();
// if this fails, the delete did not propagate because either
// awaitQuiescence() returned before the disk images were removed
// or a real failure was detected.
- assertTrue(" data dir not empty: " + datanodeDataDir,
- blockFilenames.length==0);
+ assertTrue(" data dir not empty: " + dn.data.volumes,
+ blocks.length==0);
}
/**
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java Tue Sep 5 15:17:14 2006
@@ -357,12 +357,13 @@
//
for (int i = 0; i < dataNodeNum; i++) {
// uniquely config real fs path for data storage for this datanode
- String dataDir = baseDirSpecified + "/datanode" + i;
- conf.set("dfs.data.dir", dataDir);
- DataNode dn = DataNode.makeInstanceForDir(dataDir, conf);
+ String dataDir[] = new String[1];
+ dataDir[0] = baseDirSpecified + "/datanode" + i;
+ conf.set("dfs.data.dir", dataDir[0]);
+ DataNode dn = DataNode.makeInstance(dataDir, conf);
if (dn != null) {
dataNodeDaemons.add(dn);
- (new Thread(dn, "DataNode" + i + ": " + dataDir)).start();
+ (new Thread(dn, "DataNode" + i + ": " + dataDir[0])).start();
}
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?view=diff&rev=440508&r1=440507&r2=440508
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Tue Sep 5 15:17:14 2006
@@ -71,9 +71,12 @@
*/
public void run() {
try {
- File dataDir = new File(conf.get("dfs.data.dir"));
- dataDir.mkdirs();
- node = new DataNode(conf, dataDir.getPath());
+ String[] dirs = conf.getStrings("dfs.data.dir");
+ for (int idx = 0; idx < dirs.length; idx++) {
+ File dataDir = new File(dirs[idx]);
+ dataDir.mkdirs();
+ }
+ node = new DataNode(conf, dirs);
node.run();
} catch (Throwable e) {
node = null;
@@ -105,7 +108,8 @@
File base_dir = new File(System.getProperty("test.build.data"),
"dfs/");
conf.set("dfs.name.dir", new File(base_dir, "name").getPath());
- conf.set("dfs.data.dir", new File(base_dir, "data").getPath());
+ conf.set("dfs.data.dir", new File(base_dir, "data1").getPath()+","+
+ new File(base_dir, "data2").getPath());
conf.setInt("dfs.replication", 1);
// this timeout seems to control the minimum time for the test, so
// decrease it considerably.