You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC
svn commit: r529410 [6/27] - in /lucene/hadoop/trunk: ./
src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/
src/contrib/abacus/src/java/org/apache/hadoop/abacus/
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/c...
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Mon Apr 16 14:44:35 2007
@@ -75,18 +75,18 @@
}
DatanodeInfo( DatanodeID nodeID ) {
- super( nodeID );
- this.capacity = 0L;
- this.remaining = 0L;
- this.lastUpdate = 0L;
- this.xceiverCount = 0;
- this.adminState = null;
+ super( nodeID );
+ this.capacity = 0L;
+ this.remaining = 0L;
+ this.lastUpdate = 0L;
+ this.xceiverCount = 0;
+ this.adminState = null;
}
DatanodeInfo( DatanodeID nodeID, String location, String hostName ) {
- this(nodeID);
- this.location = location;
- this.hostName = hostName;
+ this(nodeID);
+ this.location = location;
+ this.hostName = hostName;
}
/** The raw capacity. */
@@ -130,7 +130,7 @@
}
public String getPath() {
- return location+NodeBase.PATH_SEPARATOR_STR+name;
+ return location+NodeBase.PATH_SEPARATOR_STR+name;
}
@@ -150,7 +150,7 @@
long u = c - r;
buffer.append("Name: "+name+"\n");
if(!NetworkTopology.DEFAULT_RACK.equals(location)) {
- buffer.append("Rack: "+location+"\n");
+ buffer.append("Rack: "+location+"\n");
}
if (isDecommissioned()) {
buffer.append("State : Decommissioned\n");
@@ -174,7 +174,7 @@
long u = c - r;
buffer.append(name);
if(!NetworkTopology.DEFAULT_RACK.equals(location)) {
- buffer.append(" "+location);
+ buffer.append(" "+location);
}
if (isDecommissioned()) {
buffer.append(" DD");
@@ -209,51 +209,51 @@
/**
* Returns true if the node is in the process of being decommissioned
*/
- boolean isDecommissionInProgress() {
- if (adminState == AdminStates.DECOMMISSION_INPROGRESS) {
- return true;
- }
- return false;
- }
+ boolean isDecommissionInProgress() {
+ if (adminState == AdminStates.DECOMMISSION_INPROGRESS) {
+ return true;
+ }
+ return false;
+ }
/**
* Returns true if the node has been decommissioned.
*/
- boolean isDecommissioned() {
- if (adminState == AdminStates.DECOMMISSIONED) {
- return true;
- }
- return false;
- }
+ boolean isDecommissioned() {
+ if (adminState == AdminStates.DECOMMISSIONED) {
+ return true;
+ }
+ return false;
+ }
/**
* Sets the admin state to indicate that decommision is complete.
*/
- void setDecommissioned() {
- adminState = AdminStates.DECOMMISSIONED;
- }
-
- /**
- * Retrieves the admin state of this node.
- */
- AdminStates getAdminState() {
- if (adminState == null) {
- return AdminStates.NORMAL;
- }
- return adminState;
- }
-
- /**
- * Sets the admin state of this node.
- */
- void setAdminState(AdminStates newState) {
- if (newState == AdminStates.NORMAL) {
- adminState = null;
- }
- else {
- adminState = newState;
- }
+ void setDecommissioned() {
+ adminState = AdminStates.DECOMMISSIONED;
+ }
+
+ /**
+ * Retrieves the admin state of this node.
+ */
+ AdminStates getAdminState() {
+ if (adminState == null) {
+ return AdminStates.NORMAL;
}
+ return adminState;
+ }
+
+ /**
+ * Sets the admin state of this node.
+ */
+ void setAdminState(AdminStates newState) {
+ if (newState == AdminStates.NORMAL) {
+ adminState = null;
+ }
+ else {
+ adminState = newState;
+ }
+ }
private int level; //which level of the tree the node resides
private Node parent; //its parent
@@ -301,7 +301,7 @@
this.xceiverCount = in.readInt();
this.location = Text.readString( in );
AdminStates newState = (AdminStates) WritableUtils.readEnum(in,
- AdminStates.class);
+ AdminStates.class);
setAdminState(newState);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Mon Apr 16 14:44:35 2007
@@ -64,47 +64,47 @@
* new storageID if the datanode did not have one and
* registration ID for further communication.
*/
- public DatanodeRegistration register( DatanodeRegistration registration,
- String networkLocation
+ public DatanodeRegistration register( DatanodeRegistration registration,
+ String networkLocation
) throws IOException;
- /**
- * sendHeartbeat() tells the NameNode that the DataNode is still
- * alive and well. Includes some status info, too.
- * It also gives the NameNode a chance to return a "DatanodeCommand" object.
- * A DatanodeCommand tells the DataNode to invalidate local block(s),
- * or to copy them to other DataNodes, etc.
- */
- public DatanodeCommand sendHeartbeat( DatanodeRegistration registration,
- long capacity, long remaining,
- int xmitsInProgress,
- int xceiverCount) throws IOException;
+ /**
+ * sendHeartbeat() tells the NameNode that the DataNode is still
+ * alive and well. Includes some status info, too.
+ * It also gives the NameNode a chance to return a "DatanodeCommand" object.
+ * A DatanodeCommand tells the DataNode to invalidate local block(s),
+ * or to copy them to other DataNodes, etc.
+ */
+ public DatanodeCommand sendHeartbeat( DatanodeRegistration registration,
+ long capacity, long remaining,
+ int xmitsInProgress,
+ int xceiverCount) throws IOException;
- /**
- * blockReport() tells the NameNode about all the locally-stored blocks.
- * The NameNode returns an array of Blocks that have become obsolete
- * and should be deleted. This function is meant to upload *all*
- * the locally-stored blocks. It's invoked upon startup and then
- * infrequently afterwards.
- */
- public DatanodeCommand blockReport( DatanodeRegistration registration,
- Block blocks[]) throws IOException;
+ /**
+ * blockReport() tells the NameNode about all the locally-stored blocks.
+ * The NameNode returns an array of Blocks that have become obsolete
+ * and should be deleted. This function is meant to upload *all*
+ * the locally-stored blocks. It's invoked upon startup and then
+ * infrequently afterwards.
+ */
+ public DatanodeCommand blockReport( DatanodeRegistration registration,
+ Block blocks[]) throws IOException;
- /**
- * blockReceived() allows the DataNode to tell the NameNode about
- * recently-received block data. For example, whenever client code
- * writes a new Block here, or another DataNode copies a Block to
- * this DataNode, it will call blockReceived().
- */
- public void blockReceived(DatanodeRegistration registration,
- Block blocks[]) throws IOException;
+ /**
+ * blockReceived() allows the DataNode to tell the NameNode about
+ * recently-received block data. For example, whenever client code
+ * writes a new Block here, or another DataNode copies a Block to
+ * this DataNode, it will call blockReceived().
+ */
+ public void blockReceived(DatanodeRegistration registration,
+ Block blocks[]) throws IOException;
- /**
- * errorReport() tells the NameNode about something that has gone
- * awry. Useful for debugging.
- */
- public void errorReport(DatanodeRegistration registration,
- int errorCode,
- String msg) throws IOException;
+ /**
+ * errorReport() tells the NameNode about something that has gone
+ * awry. Useful for debugging.
+ */
+ public void errorReport(DatanodeRegistration registration,
+ int errorCode,
+ String msg) throws IOException;
- public NamespaceInfo versionRequest() throws IOException;
+ public NamespaceInfo versionRequest() throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DfsPath.java Mon Apr 16 14:44:35 2007
@@ -26,29 +26,29 @@
* a file with the names in a directory listing to make accesses faster.
*/
class DfsPath extends Path {
- DFSFileInfo info;
+ DFSFileInfo info;
- public DfsPath(DFSFileInfo info) {
- super(info.getPath());
- this.info = info;
- }
+ public DfsPath(DFSFileInfo info) {
+ super(info.getPath());
+ this.info = info;
+ }
- public boolean isDirectory() {
- return info.isDir();
- }
- public boolean isFile() {
- return ! isDirectory();
- }
- public long length() {
- return info.getLen();
- }
- public long getContentsLength() {
- return info.getContentsLen();
- }
- public short getReplication() {
- return info.getReplication();
- }
- public long getBlockSize() {
- return info.getBlockSize();
- }
+ public boolean isDirectory() {
+ return info.isDir();
+ }
+ public boolean isFile() {
+ return ! isDirectory();
+ }
+ public long length() {
+ return info.getLen();
+ }
+ public long getContentsLength() {
+ return info.getContentsLen();
+ }
+ public short getReplication() {
+ return info.getReplication();
+ }
+ public long getBlockSize() {
+ return info.getBlockSize();
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Mon Apr 16 14:44:35 2007
@@ -34,9 +34,9 @@
* @author Mike Cafarella
*****************************************************************/
public class DistributedFileSystem extends ChecksumFileSystem {
- private static class RawDistributedFileSystem extends FileSystem {
+ private static class RawDistributedFileSystem extends FileSystem {
private Path workingDir =
- new Path("/user", System.getProperty("user.name"));
+ new Path("/user", System.getProperty("user.name"));
private URI uri;
private FileSystem localFs;
@@ -48,7 +48,7 @@
/** @deprecated */
public RawDistributedFileSystem(InetSocketAddress namenode,
- Configuration conf) throws IOException {
+ Configuration conf) throws IOException {
initialize(URI.create("hdfs://"+
namenode.getHostName()+":"+
namenode.getPort()),
@@ -111,8 +111,8 @@
String result = makeAbsolute(file).toUri().getPath();
if (!FSNamesystem.isValidName(result)) {
throw new IllegalArgumentException("Pathname " + result + " from " +
- file +
- " is not a valid DFS filename.");
+ file +
+ " is not a valid DFS filename.");
}
return new UTF8(result);
}
@@ -130,10 +130,10 @@
}
public FSDataOutputStream create(Path f, boolean overwrite,
- int bufferSize, short replication, long blockSize,
- Progressable progress) throws IOException {
+ int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
if (exists(f) && ! overwrite) {
- throw new IOException("File already exists:"+f);
+ throw new IOException("File already exists:"+f);
}
Path parent = f.getParent();
if (parent != null && !mkdirs(parent)) {
@@ -141,14 +141,14 @@
}
return new FSDataOutputStream(
- dfs.create(getPath(f), overwrite,
- replication, blockSize, progress),
- bufferSize);
+ dfs.create(getPath(f), overwrite,
+ replication, blockSize, progress),
+ bufferSize);
}
public boolean setReplication( Path src,
- short replication
- ) throws IOException {
+ short replication
+ ) throws IOException {
return dfs.setReplication(getPath(src), replication);
}
@@ -163,36 +163,36 @@
* Get rid of Path f, whether a true file or dir.
*/
public boolean delete(Path f) throws IOException {
- return dfs.delete(getPath(f));
+ return dfs.delete(getPath(f));
}
public boolean exists(Path f) throws IOException {
- return dfs.exists(getPath(f));
+ return dfs.exists(getPath(f));
}
public boolean isDirectory(Path f) throws IOException {
- if (f instanceof DfsPath) {
- return ((DfsPath)f).isDirectory();
- }
- return dfs.isDirectory(getPath(f));
+ if (f instanceof DfsPath) {
+ return ((DfsPath)f).isDirectory();
+ }
+ return dfs.isDirectory(getPath(f));
}
public long getLength(Path f) throws IOException {
- if (f instanceof DfsPath) {
- return ((DfsPath)f).length();
- }
+ if (f instanceof DfsPath) {
+ return ((DfsPath)f).length();
+ }
- DFSFileInfo info[] = dfs.listPaths(getPath(f));
- return (info == null) ? 0 : info[0].getLen();
+ DFSFileInfo info[] = dfs.listPaths(getPath(f));
+ return (info == null) ? 0 : info[0].getLen();
}
public long getContentLength(Path f) throws IOException {
- if (f instanceof DfsPath) {
- return ((DfsPath)f).getContentsLength();
- }
+ if (f instanceof DfsPath) {
+ return ((DfsPath)f).getContentsLength();
+ }
- DFSFileInfo info[] = dfs.listPaths(getPath(f));
- return (info == null) ? 0 : info[0].getContentsLen();
+ DFSFileInfo info[] = dfs.listPaths(getPath(f));
+ return (info == null) ? 0 : info[0].getContentsLen();
}
public short getReplication(Path f) throws IOException {
@@ -202,44 +202,44 @@
DFSFileInfo info[] = dfs.listPaths(getPath(f));
return info[0].getReplication();
- }
+ }
public Path[] listPaths(Path f) throws IOException {
- DFSFileInfo info[] = dfs.listPaths(getPath(f));
- if (info == null) {
- return new Path[0];
- } else {
- Path results[] = new DfsPath[info.length];
- for (int i = 0; i < info.length; i++) {
- results[i] = new DfsPath(info[i]);
- }
- return results;
+ DFSFileInfo info[] = dfs.listPaths(getPath(f));
+ if (info == null) {
+ return new Path[0];
+ } else {
+ Path results[] = new DfsPath[info.length];
+ for (int i = 0; i < info.length; i++) {
+ results[i] = new DfsPath(info[i]);
}
+ return results;
+ }
}
public boolean mkdirs(Path f) throws IOException {
- return dfs.mkdirs(getPath(f));
+ return dfs.mkdirs(getPath(f));
}
/** @deprecated */ @Deprecated
- public void lock(Path f, boolean shared) throws IOException {
- dfs.lock(getPath(f), ! shared);
+ public void lock(Path f, boolean shared) throws IOException {
+ dfs.lock(getPath(f), ! shared);
}
/** @deprecated */ @Deprecated
- public void release(Path f) throws IOException {
- dfs.release(getPath(f));
+ public void release(Path f) throws IOException {
+ dfs.release(getPath(f));
}
@Override
- public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
- throws IOException {
+ public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
+ throws IOException {
FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
}
@Override
- public void copyToLocalFile(boolean delSrc, Path src, Path dst)
- throws IOException {
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+ throws IOException {
FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
}
@@ -257,27 +257,27 @@
}
public void close() throws IOException {
- super.close();
- dfs.close();
+ super.close();
+ dfs.close();
}
public String toString() {
- return "DFS[" + dfs + "]";
+ return "DFS[" + dfs + "]";
}
DFSClient getClient() {
- return dfs;
+ return dfs;
}
/** Return the total raw capacity of the filesystem, disregarding
* replication .*/
public long getRawCapacity() throws IOException{
- return dfs.totalRawCapacity();
+ return dfs.totalRawCapacity();
}
/** Return the total raw used space in the filesystem, disregarding
* replication .*/
public long getRawUsed() throws IOException{
- return dfs.totalRawUsed();
+ return dfs.totalRawUsed();
}
/** Return statistics for each datanode. */
@@ -291,7 +291,7 @@
* @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
*/
public boolean setSafeMode( FSConstants.SafeModeAction action )
- throws IOException {
+ throws IOException {
return dfs.setSafeMode( action );
}
@@ -325,8 +325,8 @@
* we can consider figuring out exactly which block is corrupt.
*/
public boolean reportChecksumFailure(Path f,
- FSDataInputStream in, long inPos,
- FSDataInputStream sums, long sumsPos) {
+ FSDataInputStream in, long inPos,
+ FSDataInputStream sums, long sumsPos) {
LocatedBlock lblocks[] = new LocatedBlock[2];
@@ -365,81 +365,81 @@
return true;
}
- }
+ }
- public DistributedFileSystem() {
- super( new RawDistributedFileSystem() );
- }
+ public DistributedFileSystem() {
+ super( new RawDistributedFileSystem() );
+ }
- /** @deprecated */
- public DistributedFileSystem(InetSocketAddress namenode,
- Configuration conf) throws IOException {
- super( new RawDistributedFileSystem(namenode, conf) );
- }
+ /** @deprecated */
+ public DistributedFileSystem(InetSocketAddress namenode,
+ Configuration conf) throws IOException {
+ super( new RawDistributedFileSystem(namenode, conf) );
+ }
- @Override
+ @Override
public long getContentLength(Path f) throws IOException {
- return fs.getContentLength(f);
- }
+ return fs.getContentLength(f);
+ }
- /** Return the total raw capacity of the filesystem, disregarding
- * replication .*/
- public long getRawCapacity() throws IOException{
- return ((RawDistributedFileSystem)fs).getRawCapacity();
- }
+ /** Return the total raw capacity of the filesystem, disregarding
+ * replication .*/
+ public long getRawCapacity() throws IOException{
+ return ((RawDistributedFileSystem)fs).getRawCapacity();
+ }
- /** Return the total raw used space in the filesystem, disregarding
- * replication .*/
- public long getRawUsed() throws IOException{
- return ((RawDistributedFileSystem)fs).getRawUsed();
- }
+ /** Return the total raw used space in the filesystem, disregarding
+ * replication .*/
+ public long getRawUsed() throws IOException{
+ return ((RawDistributedFileSystem)fs).getRawUsed();
+ }
- /** Return statistics for each datanode. */
- public DatanodeInfo[] getDataNodeStats() throws IOException {
- return ((RawDistributedFileSystem)fs).getDataNodeStats();
- }
+ /** Return statistics for each datanode. */
+ public DatanodeInfo[] getDataNodeStats() throws IOException {
+ return ((RawDistributedFileSystem)fs).getDataNodeStats();
+ }
- /**
- * Enter, leave or get safe mode.
- *
- * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
- */
- public boolean setSafeMode( FSConstants.SafeModeAction action )
+ /**
+ * Enter, leave or get safe mode.
+ *
+ * @see org.apache.hadoop.dfs.ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
+ */
+ public boolean setSafeMode( FSConstants.SafeModeAction action )
throws IOException {
- return ((RawDistributedFileSystem)fs).setSafeMode( action );
- }
+ return ((RawDistributedFileSystem)fs).setSafeMode( action );
+ }
- /*
- * Refreshes the list of hosts and excluded hosts from the configured
- * files.
- */
- public void refreshNodes() throws IOException {
- ((RawDistributedFileSystem)fs).refreshNodes();
- }
+ /*
+ * Refreshes the list of hosts and excluded hosts from the configured
+ * files.
+ */
+ public void refreshNodes() throws IOException {
+ ((RawDistributedFileSystem)fs).refreshNodes();
+ }
- /**
- * Finalize previously upgraded files system state.
- */
- public void finalizeUpgrade() throws IOException {
- ((RawDistributedFileSystem)fs).finalizeUpgrade();
- }
+ /**
+ * Finalize previously upgraded files system state.
+ */
+ public void finalizeUpgrade() throws IOException {
+ ((RawDistributedFileSystem)fs).finalizeUpgrade();
+ }
- /*
- * Dumps dfs data structures into specified file.
- */
- public void metaSave(String pathname) throws IOException {
- ((RawDistributedFileSystem)fs).metaSave(pathname);
- }
+ /*
+ * Dumps dfs data structures into specified file.
+ */
+ public void metaSave(String pathname) throws IOException {
+ ((RawDistributedFileSystem)fs).metaSave(pathname);
+ }
- /**
- * We need to find the blocks that didn't match. Likely only one
- * is corrupt but we will report both to the namenode. In the future,
- * we can consider figuring out exactly which block is corrupt.
- */
- public boolean reportChecksumFailure(Path f,
- FSDataInputStream in, long inPos,
- FSDataInputStream sums, long sumsPos) {
- return ((RawDistributedFileSystem)fs).reportChecksumFailure(
- f, in, inPos, sums, sumsPos);
- }
+ /**
+ * We need to find the blocks that didn't match. Likely only one
+ * is corrupt but we will report both to the namenode. In the future,
+ * we can consider figuring out exactly which block is corrupt.
+ */
+ public boolean reportChecksumFailure(Path f,
+ FSDataInputStream in, long inPos,
+ FSDataInputStream sums, long sumsPos) {
+ return ((RawDistributedFileSystem)fs).reportChecksumFailure(
+ f, in, inPos, sums, sumsPos);
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Apr 16 14:44:35 2007
@@ -25,120 +25,120 @@
* @author Mike Cafarella
************************************/
public interface FSConstants {
- public static int MIN_BLOCKS_FOR_WRITE = 5;
+ public static int MIN_BLOCKS_FOR_WRITE = 5;
- public static final long WRITE_COMPLETE = 0xcafae11a;
+ public static final long WRITE_COMPLETE = 0xcafae11a;
- //
- // IPC Opcodes
- //
- // Processed at namenode
- public static final byte OP_ERROR = (byte) 0;
- public static final byte OP_HEARTBEAT = (byte) 1;
- public static final byte OP_BLOCKRECEIVED = (byte) 2;
- public static final byte OP_BLOCKREPORT = (byte) 3;
- public static final byte OP_TRANSFERDATA = (byte) 4;
-
- // Processed at namenode, from client
- public static final byte OP_CLIENT_OPEN = (byte) 20;
- public static final byte OP_CLIENT_STARTFILE = (byte) 21;
- public static final byte OP_CLIENT_ADDBLOCK = (byte) 22;
- public static final byte OP_CLIENT_RENAMETO = (byte) 23;
- public static final byte OP_CLIENT_DELETE = (byte) 24;
- public static final byte OP_CLIENT_COMPLETEFILE = (byte) 25;
- public static final byte OP_CLIENT_LISTING = (byte) 26;
- public static final byte OP_CLIENT_OBTAINLOCK = (byte) 27;
- public static final byte OP_CLIENT_RELEASELOCK = (byte) 28;
- public static final byte OP_CLIENT_EXISTS = (byte) 29;
- public static final byte OP_CLIENT_ISDIR = (byte) 30;
- public static final byte OP_CLIENT_MKDIRS = (byte) 31;
- public static final byte OP_CLIENT_RENEW_LEASE = (byte) 32;
- public static final byte OP_CLIENT_ABANDONBLOCK = (byte) 33;
- public static final byte OP_CLIENT_RAWSTATS = (byte) 34;
- public static final byte OP_CLIENT_DATANODEREPORT = (byte) 35;
- public static final byte OP_CLIENT_DATANODE_HINTS = (byte) 36;
+ //
+ // IPC Opcodes
+ //
+ // Processed at namenode
+ public static final byte OP_ERROR = (byte) 0;
+ public static final byte OP_HEARTBEAT = (byte) 1;
+ public static final byte OP_BLOCKRECEIVED = (byte) 2;
+ public static final byte OP_BLOCKREPORT = (byte) 3;
+ public static final byte OP_TRANSFERDATA = (byte) 4;
+
+ // Processed at namenode, from client
+ public static final byte OP_CLIENT_OPEN = (byte) 20;
+ public static final byte OP_CLIENT_STARTFILE = (byte) 21;
+ public static final byte OP_CLIENT_ADDBLOCK = (byte) 22;
+ public static final byte OP_CLIENT_RENAMETO = (byte) 23;
+ public static final byte OP_CLIENT_DELETE = (byte) 24;
+ public static final byte OP_CLIENT_COMPLETEFILE = (byte) 25;
+ public static final byte OP_CLIENT_LISTING = (byte) 26;
+ public static final byte OP_CLIENT_OBTAINLOCK = (byte) 27;
+ public static final byte OP_CLIENT_RELEASELOCK = (byte) 28;
+ public static final byte OP_CLIENT_EXISTS = (byte) 29;
+ public static final byte OP_CLIENT_ISDIR = (byte) 30;
+ public static final byte OP_CLIENT_MKDIRS = (byte) 31;
+ public static final byte OP_CLIENT_RENEW_LEASE = (byte) 32;
+ public static final byte OP_CLIENT_ABANDONBLOCK = (byte) 33;
+ public static final byte OP_CLIENT_RAWSTATS = (byte) 34;
+ public static final byte OP_CLIENT_DATANODEREPORT = (byte) 35;
+ public static final byte OP_CLIENT_DATANODE_HINTS = (byte) 36;
- // Processed at datanode, back from namenode
- public static final byte OP_ACK = (byte) 40;
- public static final byte OP_TRANSFERBLOCKS = (byte) 41;
- public static final byte OP_INVALIDATE_BLOCKS = (byte) 42;
- public static final byte OP_FAILURE = (byte) 43;
-
- // Processed at client, back from namenode
- public static final byte OP_CLIENT_OPEN_ACK = (byte) 60;
- public static final byte OP_CLIENT_STARTFILE_ACK = (byte) 61;
- public static final byte OP_CLIENT_ADDBLOCK_ACK = (byte) 62;
- public static final byte OP_CLIENT_RENAMETO_ACK = (byte) 63;
- public static final byte OP_CLIENT_DELETE_ACK = (byte) 64;
- public static final byte OP_CLIENT_COMPLETEFILE_ACK = (byte) 65;
- public static final byte OP_CLIENT_TRYAGAIN = (byte) 66;
- public static final byte OP_CLIENT_LISTING_ACK = (byte) 67;
- public static final byte OP_CLIENT_OBTAINLOCK_ACK = (byte) 68;
- public static final byte OP_CLIENT_RELEASELOCK_ACK = (byte) 69;
- public static final byte OP_CLIENT_EXISTS_ACK = (byte) 70;
- public static final byte OP_CLIENT_ISDIR_ACK = (byte) 71;
- public static final byte OP_CLIENT_MKDIRS_ACK = (byte) 72;
- public static final byte OP_CLIENT_RENEW_LEASE_ACK = (byte) 73;
- public static final byte OP_CLIENT_ABANDONBLOCK_ACK = (byte) 74;
- public static final byte OP_CLIENT_RAWSTATS_ACK = (byte) 75;
- public static final byte OP_CLIENT_DATANODEREPORT_ACK = (byte) 76;
- public static final byte OP_CLIENT_DATANODE_HINTS_ACK = (byte) 77;
-
- // Processed at datanode stream-handler
- public static final byte OP_WRITE_BLOCK = (byte) 80;
- public static final byte OP_READ_BLOCK = (byte) 81;
- public static final byte OP_READSKIP_BLOCK = (byte) 82;
- public static final byte OP_READ_RANGE_BLOCK = (byte) 83;
-
- // Encoding types
- public static final byte RUNLENGTH_ENCODING = 0;
- public static final byte CHUNKED_ENCODING = 1;
-
- // Return codes for file create
- public static final int OPERATION_FAILED = 0;
- public static final int STILL_WAITING = 1;
- public static final int COMPLETE_SUCCESS = 2;
-
- // Chunk the block Invalidate message
- public static final int BLOCK_INVALIDATE_CHUNK = 100;
-
- //
- // Timeouts, constants
- //
- public static long HEARTBEAT_INTERVAL = 3;
- public static long BLOCKREPORT_INTERVAL = 60 * 60 * 1000;
- public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
- public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD;
- public static int READ_TIMEOUT = 60 * 1000;
-
- // We need to limit the length and depth of a path in the filesystem. HADOOP-438
- // Currently we set the maximum length to 8k characters and the maximum depth to 1k.
- public static int MAX_PATH_LENGTH = 8000;
- public static int MAX_PATH_DEPTH = 1000;
+ // Processed at datanode, back from namenode
+ public static final byte OP_ACK = (byte) 40;
+ public static final byte OP_TRANSFERBLOCKS = (byte) 41;
+ public static final byte OP_INVALIDATE_BLOCKS = (byte) 42;
+ public static final byte OP_FAILURE = (byte) 43;
+
+ // Processed at client, back from namenode
+ public static final byte OP_CLIENT_OPEN_ACK = (byte) 60;
+ public static final byte OP_CLIENT_STARTFILE_ACK = (byte) 61;
+ public static final byte OP_CLIENT_ADDBLOCK_ACK = (byte) 62;
+ public static final byte OP_CLIENT_RENAMETO_ACK = (byte) 63;
+ public static final byte OP_CLIENT_DELETE_ACK = (byte) 64;
+ public static final byte OP_CLIENT_COMPLETEFILE_ACK = (byte) 65;
+ public static final byte OP_CLIENT_TRYAGAIN = (byte) 66;
+ public static final byte OP_CLIENT_LISTING_ACK = (byte) 67;
+ public static final byte OP_CLIENT_OBTAINLOCK_ACK = (byte) 68;
+ public static final byte OP_CLIENT_RELEASELOCK_ACK = (byte) 69;
+ public static final byte OP_CLIENT_EXISTS_ACK = (byte) 70;
+ public static final byte OP_CLIENT_ISDIR_ACK = (byte) 71;
+ public static final byte OP_CLIENT_MKDIRS_ACK = (byte) 72;
+ public static final byte OP_CLIENT_RENEW_LEASE_ACK = (byte) 73;
+ public static final byte OP_CLIENT_ABANDONBLOCK_ACK = (byte) 74;
+ public static final byte OP_CLIENT_RAWSTATS_ACK = (byte) 75;
+ public static final byte OP_CLIENT_DATANODEREPORT_ACK = (byte) 76;
+ public static final byte OP_CLIENT_DATANODE_HINTS_ACK = (byte) 77;
+
+ // Processed at datanode stream-handler
+ public static final byte OP_WRITE_BLOCK = (byte) 80;
+ public static final byte OP_READ_BLOCK = (byte) 81;
+ public static final byte OP_READSKIP_BLOCK = (byte) 82;
+ public static final byte OP_READ_RANGE_BLOCK = (byte) 83;
+
+ // Encoding types
+ public static final byte RUNLENGTH_ENCODING = 0;
+ public static final byte CHUNKED_ENCODING = 1;
+
+ // Return codes for file create
+ public static final int OPERATION_FAILED = 0;
+ public static final int STILL_WAITING = 1;
+ public static final int COMPLETE_SUCCESS = 2;
+
+ // Chunk the block Invalidate message
+ public static final int BLOCK_INVALIDATE_CHUNK = 100;
+
+ //
+ // Timeouts, constants
+ //
+ public static long HEARTBEAT_INTERVAL = 3;
+ public static long BLOCKREPORT_INTERVAL = 60 * 60 * 1000;
+ public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
+ public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD;
+ public static int READ_TIMEOUT = 60 * 1000;
+
+ // We need to limit the length and depth of a path in the filesystem. HADOOP-438
+ // Currently we set the maximum length to 8k characters and the maximum depth to 1k.
+ public static int MAX_PATH_LENGTH = 8000;
+ public static int MAX_PATH_DEPTH = 1000;
- //TODO mb@media-style.com: should be conf injected?
- public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
+ //TODO mb@media-style.com: should be conf injected?
+ public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
- // SafeMode actions
- public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
+ // SafeMode actions
+ public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
- // Startup options
- public enum StartupOption{ FORMAT, REGULAR, UPGRADE, ROLLBACK; }
+ // Startup options
+ public enum StartupOption{ FORMAT, REGULAR, UPGRADE, ROLLBACK; }
- /**
- * Type of the node
- */
- static public enum NodeType {
- NAME_NODE,
- DATA_NODE;
- }
-
- // Version is reflected in the dfs image and edit log files.
- // Version is reflected in the data storage file.
- // Versions are negative.
- // Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -4;
- // Current version:
- // Top level directory is reorganized to allow file system state
- // transitions: upgrade, rollback, and finalize.
+ /**
+ * Type of the node
+ */
+ static public enum NodeType {
+ NAME_NODE,
+ DATA_NODE;
+ }
+
+ // Version is reflected in the dfs image and edit log files.
+ // Version is reflected in the data storage file.
+ // Versions are negative.
+ // Decrement LAYOUT_VERSION to define a new version.
+ public static final int LAYOUT_VERSION = -4;
+ // Current version:
+ // Top level directory is reorganized to allow file system state
+ // transitions: upgrade, rollback, and finalize.
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDataset.java Mon Apr 16 14:44:35 2007
@@ -36,609 +36,609 @@
/**
- * A node type that can be built into a tree reflecting the
- * hierarchy of blocks on the local disk.
- */
- class FSDir {
- File dir;
- int numBlocks = 0;
- FSDir children[];
- int lastChildIdx = 0;
- /**
- */
- public FSDir(File dir)
- throws IOException {
- this.dir = dir;
- this.children = null;
- if (! dir.exists()) {
- if (! dir.mkdirs()) {
- throw new IOException("Mkdirs failed to create " +
- dir.toString());
- }
- } else {
- File[] files = dir.listFiles();
- int numChildren = 0;
- for (int idx = 0; idx < files.length; idx++) {
- if (files[idx].isDirectory()) {
- numChildren++;
- } else if (Block.isBlockFilename(files[idx])) {
- numBlocks++;
- }
- }
- if (numChildren > 0) {
- children = new FSDir[numChildren];
- int curdir = 0;
- for (int idx = 0; idx < files.length; idx++) {
- if (files[idx].isDirectory()) {
- children[curdir] = new FSDir(files[idx]);
- curdir++;
- }
- }
- }
+ * A node type that can be built into a tree reflecting the
+ * hierarchy of blocks on the local disk.
+ */
+ class FSDir {
+ File dir;
+ int numBlocks = 0;
+ FSDir children[];
+ int lastChildIdx = 0;
+ /**
+ */
+ public FSDir(File dir)
+ throws IOException {
+ this.dir = dir;
+ this.children = null;
+ if (! dir.exists()) {
+ if (! dir.mkdirs()) {
+ throw new IOException("Mkdirs failed to create " +
+ dir.toString());
+ }
+ } else {
+ File[] files = dir.listFiles();
+ int numChildren = 0;
+ for (int idx = 0; idx < files.length; idx++) {
+ if (files[idx].isDirectory()) {
+ numChildren++;
+ } else if (Block.isBlockFilename(files[idx])) {
+ numBlocks++;
+ }
+ }
+ if (numChildren > 0) {
+ children = new FSDir[numChildren];
+ int curdir = 0;
+ for (int idx = 0; idx < files.length; idx++) {
+ if (files[idx].isDirectory()) {
+ children[curdir] = new FSDir(files[idx]);
+ curdir++;
}
+ }
}
+ }
+ }
- public File addBlock( Block b, File src ) throws IOException {
- //First try without creating subdirectories
- File file = addBlock( b, src, false, false );
- return ( file != null ) ? file : addBlock( b, src, true, true );
- }
+ public File addBlock( Block b, File src ) throws IOException {
+ //First try without creating subdirectories
+ File file = addBlock( b, src, false, false );
+ return ( file != null ) ? file : addBlock( b, src, true, true );
+ }
- private File addBlock( Block b, File src, boolean createOk,
- boolean resetIdx ) throws IOException {
- if (numBlocks < maxBlocksPerDir) {
- File dest = new File(dir, b.getBlockName());
- src.renameTo(dest);
- numBlocks += 1;
- return dest;
- }
-
- if ( lastChildIdx < 0 && resetIdx ) {
- //reset so that all children will be checked
- lastChildIdx = random.nextInt( children.length );
- }
+ private File addBlock( Block b, File src, boolean createOk,
+ boolean resetIdx ) throws IOException {
+ if (numBlocks < maxBlocksPerDir) {
+ File dest = new File(dir, b.getBlockName());
+ src.renameTo(dest);
+ numBlocks += 1;
+ return dest;
+ }
- if ( lastChildIdx >= 0 && children != null ) {
- //Check if any child-tree has room for a block.
- for (int i=0; i < children.length; i++) {
- int idx = ( lastChildIdx + i )%children.length;
- File file = children[idx].addBlock( b, src, false, resetIdx );
- if ( file != null ) {
- lastChildIdx = idx;
- return file;
- }
- }
- lastChildIdx = -1;
- }
+ if ( lastChildIdx < 0 && resetIdx ) {
+ //reset so that all children will be checked
+ lastChildIdx = random.nextInt( children.length );
+ }
- if ( !createOk ) {
- return null;
- }
+ if ( lastChildIdx >= 0 && children != null ) {
+ //Check if any child-tree has room for a block.
+ for (int i=0; i < children.length; i++) {
+ int idx = ( lastChildIdx + i )%children.length;
+ File file = children[idx].addBlock( b, src, false, resetIdx );
+ if ( file != null ) {
+ lastChildIdx = idx;
+ return file;
+ }
+ }
+ lastChildIdx = -1;
+ }
- if ( children == null || children.length == 0 ) {
- children = new FSDir[maxBlocksPerDir];
- for (int idx = 0; idx < maxBlocksPerDir; idx++) {
- children[idx] = new FSDir( new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx) );
- }
- }
+ if ( !createOk ) {
+ return null;
+ }
- //now pick a child randomly for creating a new set of subdirs.
- lastChildIdx = random.nextInt( children.length );
- return children[ lastChildIdx ].addBlock( b, src, true, false );
+ if ( children == null || children.length == 0 ) {
+ children = new FSDir[maxBlocksPerDir];
+ for (int idx = 0; idx < maxBlocksPerDir; idx++) {
+ children[idx] = new FSDir( new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx) );
}
+ }
+
+ //now pick a child randomly for creating a new set of subdirs.
+ lastChildIdx = random.nextInt( children.length );
+ return children[ lastChildIdx ].addBlock( b, src, true, false );
+ }
- /**
- * Populate the given blockSet with any child blocks
- * found at this node.
- */
- public void getBlockInfo(TreeSet<Block> blockSet) {
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].getBlockInfo(blockSet);
- }
- }
+ /**
+ * Populate the given blockSet with any child blocks
+ * found at this node.
+ */
+ public void getBlockInfo(TreeSet<Block> blockSet) {
+ if (children != null) {
+ for (int i = 0; i < children.length; i++) {
+ children[i].getBlockInfo(blockSet);
+ }
+ }
- File blockFiles[] = dir.listFiles();
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- blockSet.add(new Block(blockFiles[i], blockFiles[i].length()));
- }
- }
+ File blockFiles[] = dir.listFiles();
+ for (int i = 0; i < blockFiles.length; i++) {
+ if (Block.isBlockFilename(blockFiles[i])) {
+ blockSet.add(new Block(blockFiles[i], blockFiles[i].length()));
}
+ }
+ }
- void getVolumeMap(HashMap<Block, FSVolume> volumeMap, FSVolume volume) {
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].getVolumeMap(volumeMap, volume);
- }
- }
+ void getVolumeMap(HashMap<Block, FSVolume> volumeMap, FSVolume volume) {
+ if (children != null) {
+ for (int i = 0; i < children.length; i++) {
+ children[i].getVolumeMap(volumeMap, volume);
+ }
+ }
- File blockFiles[] = dir.listFiles();
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), volume);
- }
- }
+ File blockFiles[] = dir.listFiles();
+ for (int i = 0; i < blockFiles.length; i++) {
+ if (Block.isBlockFilename(blockFiles[i])) {
+ volumeMap.put(new Block(blockFiles[i], blockFiles[i].length()), volume);
}
+ }
+ }
- void getBlockMap(HashMap<Block, File> blockMap) {
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].getBlockMap(blockMap);
- }
- }
+ void getBlockMap(HashMap<Block, File> blockMap) {
+ if (children != null) {
+ for (int i = 0; i < children.length; i++) {
+ children[i].getBlockMap(blockMap);
+ }
+ }
- File blockFiles[] = dir.listFiles();
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- blockMap.put(new Block(blockFiles[i], blockFiles[i].length()), blockFiles[i]);
- }
- }
+ File blockFiles[] = dir.listFiles();
+ for (int i = 0; i < blockFiles.length; i++) {
+ if (Block.isBlockFilename(blockFiles[i])) {
+ blockMap.put(new Block(blockFiles[i], blockFiles[i].length()), blockFiles[i]);
}
- /**
- * check if a data diretory is healthy
- * @throws DiskErrorException
- * @author hairong
- */
- public void checkDirTree() throws DiskErrorException {
- DiskChecker.checkDir(dir);
+ }
+ }
+ /**
+ * check if a data diretory is healthy
+ * @throws DiskErrorException
+ * @author hairong
+ */
+ public void checkDirTree() throws DiskErrorException {
+ DiskChecker.checkDir(dir);
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].checkDirTree();
- }
- }
+ if (children != null) {
+ for (int i = 0; i < children.length; i++) {
+ children[i].checkDirTree();
}
+ }
+ }
- void clearPath(File f) {
- String root = dir.getAbsolutePath();
- String dir = f.getAbsolutePath();
- if ( dir.startsWith( root ) ) {
- String[] dirNames = dir.substring( root.length() ).
- split( File.separator + "subdir" );
- if ( clearPath( f, dirNames, 1 ) )
- return;
- }
- clearPath( f, null, -1 );
- }
+ void clearPath(File f) {
+ String root = dir.getAbsolutePath();
+ String dir = f.getAbsolutePath();
+ if ( dir.startsWith( root ) ) {
+ String[] dirNames = dir.substring( root.length() ).
+ split( File.separator + "subdir" );
+ if ( clearPath( f, dirNames, 1 ) )
+ return;
+ }
+ clearPath( f, null, -1 );
+ }
- /*
- * dirNames is an array of string integers derived from
- * usual directory structure data/subdirN/subdirXY/subdirM ...
- * If dirName array is non-null, we only check the child at
- * the children[dirNames[idx]]. This avoids iterating over
- * children in common case. If directory structure changes
- * in later versions, we need to revisit this.
- */
- private boolean clearPath( File f, String[] dirNames, int idx ) {
- if ( ( dirNames == null || idx == dirNames.length ) &&
- dir.compareTo(f) == 0) {
- numBlocks--;
- return true;
- }
+ /*
+ * dirNames is an array of string integers derived from
+ * usual directory structure data/subdirN/subdirXY/subdirM ...
+ * If dirName array is non-null, we only check the child at
+ * the children[dirNames[idx]]. This avoids iterating over
+ * children in common case. If directory structure changes
+ * in later versions, we need to revisit this.
+ */
+ private boolean clearPath( File f, String[] dirNames, int idx ) {
+ if ( ( dirNames == null || idx == dirNames.length ) &&
+ dir.compareTo(f) == 0) {
+ numBlocks--;
+ return true;
+ }
- if ( dirNames != null ) {
- //guess the child index from the directory name
- if ( idx > ( dirNames.length - 1 ) || children == null ) {
- return false;
- }
- int childIdx;
- try {
- childIdx = Integer.parseInt( dirNames[idx] );
- } catch ( NumberFormatException ignored ) {
- // layout changed? we could print a warning.
- return false;
- }
- return ( childIdx >= 0 && childIdx < children.length ) ?
- children[childIdx].clearPath( f, dirNames, idx+1 ) : false;
- }
-
- //guesses failed. back to blind iteration.
- if ( children != null ) {
- for(int i=0; i < children.length; i++) {
- if ( children[i].clearPath( f, null, -1 ) ){
- return true;
- }
- }
- }
+ if ( dirNames != null ) {
+ //guess the child index from the directory name
+ if ( idx > ( dirNames.length - 1 ) || children == null ) {
return false;
}
-
- public String toString() {
- return "FSDir{" +
- "dir=" + dir +
- ", children=" + (children == null ? null : Arrays.asList(children)) +
- "}";
+ int childIdx;
+ try {
+ childIdx = Integer.parseInt( dirNames[idx] );
+ } catch ( NumberFormatException ignored ) {
+ // layout changed? we could print a warning.
+ return false;
}
- }
+ return ( childIdx >= 0 && childIdx < children.length ) ?
+ children[childIdx].clearPath( f, dirNames, idx+1 ) : false;
+ }
- class FSVolume {
- static final double USABLE_DISK_PCT_DEFAULT = 0.98f;
-
- private FSDir dataDir;
- private File tmpDir;
- private DF usage;
- private long reserved;
- private double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
-
- FSVolume( File currentDir, Configuration conf) throws IOException {
- this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
- this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
- (float) USABLE_DISK_PCT_DEFAULT);
- File parent = currentDir.getParentFile();
- this.dataDir = new FSDir( currentDir );
- this.tmpDir = new File(parent, "tmp");
- if (tmpDir.exists()) {
- FileUtil.fullyDelete(tmpDir);
- }
- if (!tmpDir.mkdirs()) {
- if (!tmpDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+ //guesses failed. back to blind iteration.
+ if ( children != null ) {
+ for(int i=0; i < children.length; i++) {
+ if ( children[i].clearPath( f, null, -1 ) ){
+ return true;
}
}
- this.usage = new DF(parent, conf);
}
-
- long getCapacity() throws IOException {
- return usage.getCapacity();
+ return false;
+ }
+
+ public String toString() {
+ return "FSDir{" +
+ "dir=" + dir +
+ ", children=" + (children == null ? null : Arrays.asList(children)) +
+ "}";
+ }
+ }
+
+ class FSVolume {
+ static final double USABLE_DISK_PCT_DEFAULT = 0.98f;
+
+ private FSDir dataDir;
+ private File tmpDir;
+ private DF usage;
+ private long reserved;
+ private double usableDiskPct = USABLE_DISK_PCT_DEFAULT;
+
+ FSVolume( File currentDir, Configuration conf) throws IOException {
+ this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+ this.usableDiskPct = conf.getFloat("dfs.datanode.du.pct",
+ (float) USABLE_DISK_PCT_DEFAULT);
+ File parent = currentDir.getParentFile();
+ this.dataDir = new FSDir( currentDir );
+ this.tmpDir = new File(parent, "tmp");
+ if (tmpDir.exists()) {
+ FileUtil.fullyDelete(tmpDir);
}
-
- long getAvailable() throws IOException {
- long capacity = usage.getCapacity();
- long freespace = Math.round(usage.getAvailableSkipRefresh() -
- capacity * (1 - usableDiskPct) - reserved);
- return ( freespace > 0 ) ? freespace : 0;
+ if (!tmpDir.mkdirs()) {
+ if (!tmpDir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+ }
}
+ this.usage = new DF(parent, conf);
+ }
- String getMount() throws IOException {
- return usage.getMount();
- }
+ long getCapacity() throws IOException {
+ return usage.getCapacity();
+ }
- File createTmpFile(Block b) throws IOException {
- File f = new File(tmpDir, b.getBlockName());
- try {
- if (f.exists()) {
- throw new IOException("Unexpected problem in creating temporary file for "+
- b + ". File " + f + " should not be present, but is.");
- }
- // Create the zero-length temp file
- //
- if (!f.createNewFile()) {
- throw new IOException("Unexpected problem in creating temporary file for "+
- b + ". File " + f + " should be creatable, but is already present.");
- }
- } catch (IOException ie) {
- System.out.println("Exception! " + ie);
- throw ie;
+ long getAvailable() throws IOException {
+ long capacity = usage.getCapacity();
+ long freespace = Math.round(usage.getAvailableSkipRefresh() -
+ capacity * (1 - usableDiskPct) - reserved);
+ return ( freespace > 0 ) ? freespace : 0;
+ }
+
+ String getMount() throws IOException {
+ return usage.getMount();
+ }
+
+ File createTmpFile(Block b) throws IOException {
+ File f = new File(tmpDir, b.getBlockName());
+ try {
+ if (f.exists()) {
+ throw new IOException("Unexpected problem in creating temporary file for "+
+ b + ". File " + f + " should not be present, but is.");
}
- return f;
+ // Create the zero-length temp file
+ //
+ if (!f.createNewFile()) {
+ throw new IOException("Unexpected problem in creating temporary file for "+
+ b + ". File " + f + " should be creatable, but is already present.");
+ }
+ } catch (IOException ie) {
+ System.out.println("Exception! " + ie);
+ throw ie;
}
+ return f;
+ }
- File addBlock(Block b, File f) throws IOException {
- return dataDir.addBlock(b, f);
- }
+ File addBlock(Block b, File f) throws IOException {
+ return dataDir.addBlock(b, f);
+ }
- void checkDirs() throws DiskErrorException {
- dataDir.checkDirTree();
- DiskChecker.checkDir(tmpDir);
- }
+ void checkDirs() throws DiskErrorException {
+ dataDir.checkDirTree();
+ DiskChecker.checkDir(tmpDir);
+ }
- void getBlockInfo(TreeSet<Block> blockSet) {
- dataDir.getBlockInfo(blockSet);
- }
+ void getBlockInfo(TreeSet<Block> blockSet) {
+ dataDir.getBlockInfo(blockSet);
+ }
- void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
- dataDir.getVolumeMap(volumeMap, this);
- }
+ void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+ dataDir.getVolumeMap(volumeMap, this);
+ }
- void getBlockMap(HashMap<Block, File> blockMap) {
- dataDir.getBlockMap(blockMap);
- }
+ void getBlockMap(HashMap<Block, File> blockMap) {
+ dataDir.getBlockMap(blockMap);
+ }
- void clearPath(File f) {
- dataDir.clearPath(f);
- }
+ void clearPath(File f) {
+ dataDir.clearPath(f);
+ }
- public String toString() {
- return dataDir.dir.getAbsolutePath();
- }
+ public String toString() {
+ return dataDir.dir.getAbsolutePath();
}
+ }
- class FSVolumeSet {
- FSVolume[] volumes = null;
- int curVolume = 0;
-
- FSVolumeSet(FSVolume[] volumes) {
- this.volumes = volumes;
- }
-
- synchronized FSVolume getNextVolume(long blockSize) throws IOException {
- int startVolume = curVolume;
- while (true) {
- FSVolume volume = volumes[curVolume];
- curVolume = (curVolume + 1) % volumes.length;
- if (volume.getAvailable() >= blockSize) { return volume; }
- if (curVolume == startVolume) {
- throw new DiskOutOfSpaceException("Insufficient space for an additional block");
- }
- }
- }
+ class FSVolumeSet {
+ FSVolume[] volumes = null;
+ int curVolume = 0;
+
+ FSVolumeSet(FSVolume[] volumes) {
+ this.volumes = volumes;
+ }
- synchronized long getCapacity() throws IOException {
- long capacity = 0L;
- for (int idx = 0; idx < volumes.length; idx++) {
- capacity += volumes[idx].getCapacity();
+ synchronized FSVolume getNextVolume(long blockSize) throws IOException {
+ int startVolume = curVolume;
+ while (true) {
+ FSVolume volume = volumes[curVolume];
+ curVolume = (curVolume + 1) % volumes.length;
+ if (volume.getAvailable() >= blockSize) { return volume; }
+ if (curVolume == startVolume) {
+ throw new DiskOutOfSpaceException("Insufficient space for an additional block");
}
- return capacity;
}
+ }
- synchronized long getRemaining() throws IOException {
- long remaining = 0L;
- for (int idx = 0; idx < volumes.length; idx++) {
- remaining += volumes[idx].getAvailable();
- }
- return remaining;
+ synchronized long getCapacity() throws IOException {
+ long capacity = 0L;
+ for (int idx = 0; idx < volumes.length; idx++) {
+ capacity += volumes[idx].getCapacity();
}
+ return capacity;
+ }
- synchronized void getBlockInfo(TreeSet<Block> blockSet) {
- for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].getBlockInfo(blockSet);
- }
+ synchronized long getRemaining() throws IOException {
+ long remaining = 0L;
+ for (int idx = 0; idx < volumes.length; idx++) {
+ remaining += volumes[idx].getAvailable();
}
+ return remaining;
+ }
- synchronized void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
- for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].getVolumeMap(volumeMap);
- }
+ synchronized void getBlockInfo(TreeSet<Block> blockSet) {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ volumes[idx].getBlockInfo(blockSet);
}
+ }
- synchronized void getBlockMap(HashMap<Block, File> blockMap) {
- for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].getBlockMap(blockMap);
- }
+ synchronized void getVolumeMap(HashMap<Block, FSVolume> volumeMap) {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ volumes[idx].getVolumeMap(volumeMap);
}
+ }
- synchronized void checkDirs() throws DiskErrorException {
- for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].checkDirs();
- }
+ synchronized void getBlockMap(HashMap<Block, File> blockMap) {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ volumes[idx].getBlockMap(blockMap);
}
+ }
- public String toString() {
- StringBuffer sb = new StringBuffer();
- for (int idx = 0; idx < volumes.length; idx++) {
- sb.append(volumes[idx].toString());
- if (idx != volumes.length - 1) { sb.append(","); }
- }
- return sb.toString();
+ synchronized void checkDirs() throws DiskErrorException {
+ for (int idx = 0; idx < volumes.length; idx++) {
+ volumes[idx].checkDirs();
}
}
- //////////////////////////////////////////////////////
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ for (int idx = 0; idx < volumes.length; idx++) {
+ sb.append(volumes[idx].toString());
+ if (idx != volumes.length - 1) { sb.append(","); }
+ }
+ return sb.toString();
+ }
+ }
+ //////////////////////////////////////////////////////
+ //
+ // FSDataSet
+ //
+ //////////////////////////////////////////////////////
+
+ FSVolumeSet volumes;
+ private HashMap<Block,File> ongoingCreates = new HashMap<Block,File>();
+ private int maxBlocksPerDir = 0;
+ private HashMap<Block,FSVolume> volumeMap = null;
+ private HashMap<Block,File> blockMap = null;
+ static Random random = new Random();
+
+ /**
+ * An FSDataset has a directory where it loads its data files.
+ */
+ public FSDataset( DataStorage storage, Configuration conf) throws IOException {
+ this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
+ FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
+ for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
+ volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
+ }
+ volumes = new FSVolumeSet(volArray);
+ volumeMap = new HashMap<Block,FSVolume>();
+ volumes.getVolumeMap(volumeMap);
+ blockMap = new HashMap<Block,File>();
+ volumes.getBlockMap(blockMap);
+ }
+
+ /**
+ * Return total capacity, used and unused
+ */
+ public long getCapacity() throws IOException {
+ return volumes.getCapacity();
+ }
+
+ /**
+ * Return how many bytes can still be stored in the FSDataset
+ */
+ public long getRemaining() throws IOException {
+ return volumes.getRemaining();
+ }
+
+ /**
+ * Find the block's on-disk length
+ */
+ public long getLength(Block b) throws IOException {
+ if (! isValidBlock(b)) {
+ throw new IOException("Block " + b + " is not valid.");
+ }
+ File f = getFile(b);
+ return f.length();
+ }
+
+ /**
+ * Get a stream of data from the indicated block.
+ */
+ public synchronized InputStream getBlockData(Block b) throws IOException {
+ if (! isValidBlock(b)) {
+ throw new IOException("Block " + b + " is not valid.");
+ }
+ // File should be opened with the lock.
+ return new FileInputStream(getFile(b));
+ }
+
+ /**
+ * Start writing to a block file
+ */
+ public OutputStream writeToBlock(Block b) throws IOException {
//
- // FSDataSet
+ // Make sure the block isn't a valid one - we're still creating it!
//
- //////////////////////////////////////////////////////
-
- FSVolumeSet volumes;
- private HashMap<Block,File> ongoingCreates = new HashMap<Block,File>();
- private int maxBlocksPerDir = 0;
- private HashMap<Block,FSVolume> volumeMap = null;
- private HashMap<Block,File> blockMap = null;
- static Random random = new Random();
-
- /**
- * An FSDataset has a directory where it loads its data files.
- */
- public FSDataset( DataStorage storage, Configuration conf) throws IOException {
- this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
- FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
- for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
- volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
- }
- volumes = new FSVolumeSet(volArray);
- volumeMap = new HashMap<Block,FSVolume>();
- volumes.getVolumeMap(volumeMap);
- blockMap = new HashMap<Block,File>();
- volumes.getBlockMap(blockMap);
- }
-
- /**
- * Return total capacity, used and unused
- */
- public long getCapacity() throws IOException {
- return volumes.getCapacity();
- }
-
- /**
- * Return how many bytes can still be stored in the FSDataset
- */
- public long getRemaining() throws IOException {
- return volumes.getRemaining();
- }
-
- /**
- * Find the block's on-disk length
- */
- public long getLength(Block b) throws IOException {
- if (! isValidBlock(b)) {
- throw new IOException("Block " + b + " is not valid.");
- }
- File f = getFile(b);
- return f.length();
- }
-
- /**
- * Get a stream of data from the indicated block.
- */
- public synchronized InputStream getBlockData(Block b) throws IOException {
- if (! isValidBlock(b)) {
- throw new IOException("Block " + b + " is not valid.");
- }
- // File should be opened with the lock.
- return new FileInputStream(getFile(b));
- }
-
- /**
- * Start writing to a block file
- */
- public OutputStream writeToBlock(Block b) throws IOException {
- //
- // Make sure the block isn't a valid one - we're still creating it!
- //
- if (isValidBlock(b)) {
- throw new IOException("Block " + b + " is valid, and cannot be written to.");
- }
- long blockSize = b.getNumBytes();
-
- //
- // Serialize access to /tmp, and check if file already there.
- //
- File f = null;
- synchronized ( this ) {
- //
- // Is it already in the create process?
- //
- if (ongoingCreates.containsKey(b)) {
- // check how old is the temp file - wait 1 hour
- File tmp = (File)ongoingCreates.get(b);
- if ((System.currentTimeMillis() - tmp.lastModified()) < 3600 * 1000) {
- throw new IOException("Block " + b +
- " has already been started (though not completed), and thus cannot be created.");
- } else {
- // stale temp file - remove
- if (!tmp.delete()) {
- throw new IOException("Can't write the block - unable to remove stale temp file " + tmp);
- }
- ongoingCreates.remove(b);
- }
- }
- FSVolume v = null;
- synchronized ( volumes ) {
- v = volumes.getNextVolume(blockSize);
- // create temporary file to hold block in the designated volume
- f = v.createTmpFile(b);
- }
- ongoingCreates.put(b, f);
- volumeMap.put(b, v);
- }
-
- //
- // Finally, allow a writer to the block file
- // REMIND - mjc - make this a filter stream that enforces a max
- // block size, so clients can't go crazy
- //
- return new FileOutputStream(f);
+ if (isValidBlock(b)) {
+ throw new IOException("Block " + b + " is valid, and cannot be written to.");
}
+ long blockSize = b.getNumBytes();
//
- // REMIND - mjc - eventually we should have a timeout system
- // in place to clean up block files left by abandoned clients.
- // We should have some timer in place, so that if a blockfile
- // is created but non-valid, and has been idle for >48 hours,
- // we can GC it safely.
+ // Serialize access to /tmp, and check if file already there.
//
-
- /**
- * Complete the block write!
- */
- public synchronized void finalizeBlock(Block b) throws IOException {
- File f = ongoingCreates.get(b);
- if (f == null || ! f.exists()) {
- throw new IOException("No temporary file " + f + " for block " + b);
- }
- long finalLen = f.length();
- b.setNumBytes(finalLen);
- FSVolume v = volumeMap.get(b);
-
- File dest = null;
- synchronized ( volumes ) {
- dest = v.addBlock(b, f);
- }
- blockMap.put(b, dest);
- ongoingCreates.remove(b);
- }
-
- /**
- * Return a table of block data
- */
- public Block[] getBlockReport() {
- TreeSet<Block> blockSet = new TreeSet<Block>();
- volumes.getBlockInfo(blockSet);
- Block blockTable[] = new Block[blockSet.size()];
- int i = 0;
- for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
- blockTable[i] = it.next();
- }
- return blockTable;
- }
-
- /**
- * Check whether the given block is a valid one.
- */
- public boolean isValidBlock(Block b) {
- File f = getFile(b);
- return (f!= null && f.exists());
- }
-
- /**
- * We're informed that a block is no longer valid. We
- * could lazily garbage-collect the block, but why bother?
- * just get rid of it.
- */
- public void invalidate(Block invalidBlks[]) throws IOException {
- for (int i = 0; i < invalidBlks.length; i++) {
- File f;
- synchronized (this) {
- f = getFile(invalidBlks[i]);
- if (f == null) {
- throw new IOException("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". Block not found in blockMap.");
- }
- FSVolume v = volumeMap.get(invalidBlks[i]);
- if (v == null) {
- throw new IOException("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". No volume for this block.");
- }
- File parent = f.getParentFile();
- if (parent == null) {
- throw new IOException("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". Parent not found for file " + f + ".");
- }
- v.clearPath(parent);
- blockMap.remove(invalidBlks[i]);
- volumeMap.remove(invalidBlks[i]);
- }
- if (!f.delete()) {
- throw new IOException("Unexpected error trying to delete block "
- + invalidBlks[i] + " at file " + f);
- }
- DataNode.LOG.info("Deleting block " + invalidBlks[i]);
+ File f = null;
+ synchronized ( this ) {
+ //
+ // Is it already in the create process?
+ //
+ if (ongoingCreates.containsKey(b)) {
+ // check how old is the temp file - wait 1 hour
+ File tmp = (File)ongoingCreates.get(b);
+ if ((System.currentTimeMillis() - tmp.lastModified()) < 3600 * 1000) {
+ throw new IOException("Block " + b +
+ " has already been started (though not completed), and thus cannot be created.");
+ } else {
+ // stale temp file - remove
+ if (!tmp.delete()) {
+ throw new IOException("Can't write the block - unable to remove stale temp file " + tmp);
+ }
+ ongoingCreates.remove(b);
+ }
+ }
+ FSVolume v = null;
+ synchronized ( volumes ) {
+ v = volumes.getNextVolume(blockSize);
+ // create temporary file to hold block in the designated volume
+ f = v.createTmpFile(b);
}
+ ongoingCreates.put(b, f);
+ volumeMap.put(b, v);
}
- /**
- * Turn the block identifier into a filename.
- */
- synchronized File getFile(Block b) {
- return blockMap.get(b);
- }
+ //
+ // Finally, allow a writer to the block file
+ // REMIND - mjc - make this a filter stream that enforces a max
+ // block size, so clients can't go crazy
+ //
+ return new FileOutputStream(f);
+ }
- /**
- * check if a data diretory is healthy
- * @throws DiskErrorException
- * @author hairong
- */
- void checkDataDir() throws DiskErrorException {
- volumes.checkDirs();
- }
+ //
+ // REMIND - mjc - eventually we should have a timeout system
+ // in place to clean up block files left by abandoned clients.
+ // We should have some timer in place, so that if a blockfile
+ // is created but non-valid, and has been idle for >48 hours,
+ // we can GC it safely.
+ //
+
+ /**
+ * Complete the block write!
+ */
+ public synchronized void finalizeBlock(Block b) throws IOException {
+ File f = ongoingCreates.get(b);
+ if (f == null || ! f.exists()) {
+ throw new IOException("No temporary file " + f + " for block " + b);
+ }
+ long finalLen = f.length();
+ b.setNumBytes(finalLen);
+ FSVolume v = volumeMap.get(b);
+
+ File dest = null;
+ synchronized ( volumes ) {
+ dest = v.addBlock(b, f);
+ }
+ blockMap.put(b, dest);
+ ongoingCreates.remove(b);
+ }
+
+ /**
+ * Return a table of block data
+ */
+ public Block[] getBlockReport() {
+ TreeSet<Block> blockSet = new TreeSet<Block>();
+ volumes.getBlockInfo(blockSet);
+ Block blockTable[] = new Block[blockSet.size()];
+ int i = 0;
+ for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
+ blockTable[i] = it.next();
+ }
+ return blockTable;
+ }
+
+ /**
+ * Check whether the given block is a valid one.
+ */
+ public boolean isValidBlock(Block b) {
+ File f = getFile(b);
+ return (f!= null && f.exists());
+ }
+
+ /**
+ * We're informed that a block is no longer valid. We
+ * could lazily garbage-collect the block, but why bother?
+ * just get rid of it.
+ */
+ public void invalidate(Block invalidBlks[]) throws IOException {
+ for (int i = 0; i < invalidBlks.length; i++) {
+ File f;
+ synchronized (this) {
+ f = getFile(invalidBlks[i]);
+ if (f == null) {
+ throw new IOException("Unexpected error trying to delete block "
+ + invalidBlks[i] +
+ ". Block not found in blockMap.");
+ }
+ FSVolume v = volumeMap.get(invalidBlks[i]);
+ if (v == null) {
+ throw new IOException("Unexpected error trying to delete block "
+ + invalidBlks[i] +
+ ". No volume for this block.");
+ }
+ File parent = f.getParentFile();
+ if (parent == null) {
+ throw new IOException("Unexpected error trying to delete block "
+ + invalidBlks[i] +
+ ". Parent not found for file " + f + ".");
+ }
+ v.clearPath(parent);
+ blockMap.remove(invalidBlks[i]);
+ volumeMap.remove(invalidBlks[i]);
+ }
+ if (!f.delete()) {
+ throw new IOException("Unexpected error trying to delete block "
+ + invalidBlks[i] + " at file " + f);
+ }
+ DataNode.LOG.info("Deleting block " + invalidBlks[i]);
+ }
+ }
+
+ /**
+ * Turn the block identifier into a filename.
+ */
+ synchronized File getFile(Block b) {
+ return blockMap.get(b);
+ }
+
+ /**
+ * check if a data diretory is healthy
+ * @throws DiskErrorException
+ * @author hairong
+ */
+ void checkDataDir() throws DiskErrorException {
+ volumes.checkDirs();
+ }
- public String toString() {
- return "FSDataset{dirpath='"+volumes+"'}";
- }
+ public String toString() {
+ return "FSDataset{dirpath='"+volumes+"'}";
+ }
}