You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/09/20 00:12:50 UTC
svn commit: r577456 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/
src/test/org/apache/hadoop/dfs/
Author: dhruba
Date: Wed Sep 19 15:12:49 2007
New Revision: 577456
URL: http://svn.apache.org/viewvc?rev=577456&view=rev
Log:
HADOOP-89. A client can access file data even before the creator
has closed the file. Introduce a new dfs shell command "tail".
(Dhruba Borthakur via dhruba)
Removed:
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileUnderConstruction.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingCreates.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Sep 19 15:12:49 2007
@@ -35,6 +35,10 @@
NEW FEATURES
+ HADOOP-89. A client can access file data even before the creator
+ has closed the file. Introduce a new command "tail" from dfs shell.
+ (Dhruba Borthakur via dhruba)
+
HADOOP-1636. Allow configuration of the number of jobs kept in
memory by the JobTracker. (Michael Bieniosek via omalley)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Wed Sep 19 15:12:49 2007
@@ -25,6 +25,7 @@
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
/*************************************************
* FSDirectory stores the filesystem directory state.
@@ -114,31 +115,43 @@
/**
* Add the given filename to the fs.
*/
- public boolean addFile(String path, Block[] blocks, short replication,
- long preferredBlockSize) {
+ INode addFile(String path,
+ short replication,
+ long preferredBlockSize,
+ String clientName,
+ String clientMachine,
+ DatanodeDescriptor clientNode)
+ throws IOException {
waitForReady();
// Always do an implicit mkdirs for parent directory tree.
long modTime = FSNamesystem.now();
if (!mkdirs(new Path(path).getParent().toString(), modTime)) {
- return false;
+ return null;
+ }
+ INodeFile newNode = new INodeFileUnderConstruction(replication,
+ preferredBlockSize, modTime, clientName,
+ clientMachine, clientNode);
+ synchronized (rootDir) {
+ try {
+ newNode = rootDir.addNode(path, newNode);
+ } catch (FileNotFoundException e) {
+ newNode = null;
+ }
}
- INodeFile newNode = (INodeFile)unprotectedAddFile(path, blocks, replication,
- modTime,
- preferredBlockSize);
if (newNode == null) {
NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
- +"failed to add "+path+" with "
- +blocks.length+" blocks to the file system");
- return false;
+ +"failed to add "+path
+ +" to the file system");
+ return null;
}
// add create file record to log
fsImage.getEditLog().logCreateFile(path, newNode);
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
- +path+" with "+blocks.length+" blocks is added to the file system");
- return true;
+ +path+" is added to the file system");
+ return newNode;
}
-
+
/**
*/
INode unprotectedAddFile( String path,
@@ -171,28 +184,64 @@
}
/**
- * Add blocks to the file.
+ * Add a block to the file. Returns a reference to the added block.
+ */
+ Block addBlock(String path, INode file, Block block) throws IOException {
+ waitForReady();
+
+ synchronized (rootDir) {
+ INodeFile fileNode = (INodeFile) file;
+
+ // associate the new list of blocks with this file
+ namesystem.blocksMap.addINode(block, fileNode);
+ BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
+ fileNode.addBlock(blockInfo);
+
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+ + path + " with " + block
+ + " block is added to the in-memory "
+ + "file system");
+ }
+ return block;
+ }
+
+ /**
+ * Persist the block list for the inode.
+ */
+ void persistBlocks(String path, INode file) throws IOException {
+ waitForReady();
+
+ synchronized (rootDir) {
+ INodeFile fileNode = (INodeFile) file;
+
+ // create two transactions. The first one deletes the empty
+ // file and the second transaction recreates the same file
+ // with the appropriate set of blocks.
+ fsImage.getEditLog().logDelete(path, fileNode.getModificationTime());
+
+ // re-add create file record to log
+ fsImage.getEditLog().logCreateFile(path, fileNode);
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+ +path+" with "+ fileNode.getBlocks().length
+ +" blocks is persisted to the file system");
+ }
+ }
+
+ /**
+ * Remove a block to the file.
*/
- boolean addBlocks(String path, Block[] blocks) throws IOException {
+ boolean removeBlock(String path, INode file, Block block) throws IOException {
waitForReady();
synchronized (rootDir) {
- INodeFile fileNode = this.getFileINode(path);
+ INodeFile fileNode = (INodeFile) file;
if (fileNode == null) {
throw new IOException("Unknown file: " + path);
}
- if (fileNode.getBlocks() != null &&
- fileNode.getBlocks().length != 0) {
- throw new IOException("Cannot add new blocks to " +
- "already existing file.");
- }
- // associate the new list of blocks with this file
- fileNode.allocateBlocks(blocks.length);
- for (int i = 0; i < blocks.length; i++) {
- fileNode.setBlock(i,
- namesystem.blocksMap.addINode(blocks[i], fileNode));
- }
+ // modify file-> block and blocksMap
+ fileNode.removeBlock(block);
+ namesystem.blocksMap.removeINode(block);
// create two transactions. The first one deletes the empty
// file and the second transaction recreates the same file
@@ -202,8 +251,8 @@
// re-add create file record to log
fsImage.getEditLog().logCreateFile(path, fileNode);
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
- +path+" with "+blocks.length
- +" blocks is added to the file system");
+ +path+" with "+block
+ +" block is added to the file system");
}
return true;
}
@@ -372,6 +421,28 @@
}
return v.toArray(new Block[v.size()]);
}
+ }
+ }
+ }
+
+ /**
+ * Replaces the specified inode with the specified one.
+ */
+ void replaceNode(String path, INodeFile oldnode, INodeFile newnode)
+ throws IOException {
+ synchronized (rootDir) {
+ //
+ // Remove the node from the namespace
+ //
+ if (!oldnode.removeNode()) {
+ NameNode.stateChangeLog.warn("DIR* FSDirectory.replaceNode: " +
+ "failed to remove " + path);
+ throw new IOException("FSDirectory.replaceNode: " +
+ "failed to remove " + path);
+ }
+ rootDir.addNode(path, newnode);
+ for (Block b : newnode.getBlocks()) {
+ namesystem.blocksMap.addINode(b, newnode);
}
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Sep 19 15:12:49 2007
@@ -104,12 +104,6 @@
new TreeMap<String, Collection<Block>>();
//
- // Keeps track of files that are being created, plus the
- // blocks that make them up.
- //
- PendingCreates pendingCreates = new PendingCreates();
-
- //
// Stats on overall usage
//
long totalCapacity = 0L, totalUsed=0L, totalRemaining = 0L;
@@ -719,7 +713,15 @@
throw new IOException(
text + " is less than the required minimum " + minReplication);
}
-
+
+ void startFile(String src, String holder, String clientMachine,
+ boolean overwrite, short replication, long blockSize
+ ) throws IOException {
+ startFileInternal(src, holder, clientMachine, overwrite,
+ replication, blockSize);
+ getEditLog().logSync();
+ }
+
/**
* The client would like to create a new block for the indicated
* filename. Return an array that consists of the block, plus a set
@@ -731,7 +733,7 @@
* @throws IOException if the filename is invalid
* {@link FSDirectory#isValidToCreate(String)}.
*/
- synchronized void startFile(String src,
+ synchronized void startFileInternal(String src,
String holder,
String clientMachine,
boolean overwrite,
@@ -746,10 +748,11 @@
throw new IOException("Invalid file name: " + src);
}
try {
- FileUnderConstruction pendingFile = pendingCreates.get(src);
- if (pendingFile != null) {
+ INode myFile = dir.getFileINode(src);
+ if (myFile != null && (myFile instanceof INodeFileUnderConstruction)) {
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
//
- // If the file exists in pendingCreate, then it must be in our
+ // If the file is under construction , then it must be in our
// leases. Find the appropriate lease record.
//
Lease lease = getLease(holder);
@@ -814,15 +817,6 @@
DatanodeDescriptor clientNode =
host2DataNodeMap.getDatanodeByHost(clientMachine);
- // Reserve space for this pending file
- pendingCreates.put(src,
- new FileUnderConstruction(replication,
- blockSize,
- holder,
- clientMachine,
- clientNode));
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
- +"add "+src+" to pendingCreates for "+holder);
synchronized (leases) {
Lease lease = getLease(holder);
if (lease == null) {
@@ -836,20 +830,27 @@
}
lease.startedCreate(src);
}
+
+ //
+ // Now we can add the name to the filesystem. This file has no
+ // blocks associated with it.
+ //
+ INode newNode = dir.addFile(src, replication, blockSize,
+ holder,
+ clientMachine,
+ clientNode);
+ if (newNode == null) {
+ throw new IOException("DIR* NameSystem.startFile: " +
+ "Unable to add file to namespace.");
+ }
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+ie.getMessage());
throw ie;
}
- //
- // Now we can add the name to the filesystem. This file has no
- // blocks associated with it.
- //
- if (!dir.addFile(src, new Block[0], replication, blockSize)) {
- throw new IOException("DIR* NameSystem.startFile: " +
- "Unable to add file to namespace.");
- }
+ NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+ +"add "+src+" to namespace for "+holder);
}
/**
@@ -882,7 +883,7 @@
//
// make sure that we still have the lease on this file
//
- FileUnderConstruction pendingFile = pendingCreates.get(src);
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src);
if (pendingFile == null) {
throw new LeaseExpiredException("No lease on " + src);
}
@@ -898,11 +899,11 @@
if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet:" + src);
}
- fileLength = pendingFile.computeFileLength();
- blockSize = pendingFile.getBlockSize();
+ fileLength = pendingFile.computeContentsLength();
+ blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = (int)pendingFile.getReplication();
- newBlock = allocateBlock(src);
+ newBlock = allocateBlock(src, pendingFile);
}
DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
@@ -928,13 +929,14 @@
//
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+b.getBlockName()+"of file "+src);
- boolean status = pendingCreates.removeBlock(src, b);
- if (status) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ INode file = dir.getFileINode(src);
+ if (file != null) {
+ dir.removeBlock(src, file, b);
+ }
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ b.getBlockName()
+ " is removed from pendingCreates");
- }
- return status;
+ return true;
}
/**
@@ -964,8 +966,7 @@
}
/**
- * Finalize the created file and make it world-accessible. The
- * FSNamesystem will already know the blocks that make up the file.
+ * The FSNamesystem will already know the blocks that make up the file.
* Before we return, we make sure that all the file's blocks have
* been reported by datanodes and are replicated correctly.
*/
@@ -980,11 +981,11 @@
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
if (isInSafeMode())
throw new SafeModeException("Cannot complete file " + src, safeMode);
- FileUnderConstruction pendingFile = pendingCreates.get(src);
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src);
Block[] fileBlocks = dir.getFileBlocks(src);
- if ((fileBlocks != null && fileBlocks.length > 0) ||
- pendingFile == null) {
+ if (fileBlocks == null || fileBlocks.length == 0 ||
+ pendingFile == null) {
NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
+ "failed to complete " + src
+ " because dir.getFileBlocks() is " +
@@ -999,36 +1000,16 @@
return STILL_WAITING;
}
- Collection<Block> blocks = pendingFile.getBlocks();
- int nrBlocks = blocks.size();
- Block pendingBlocks[] = new Block[nrBlocks];
-
- //
- // We have the pending blocks, but they won't have
- // length info in them (as they were allocated before
- // data-write took place). Find the block stored in
- // node descriptor.
- //
- int idx = 0;
- for (Block b : blocks) {
- Block storedBlock = blocksMap.getStoredBlock(b);
- // according to checkFileProgress() every block is present & replicated
- assert storedBlock != null : "Missing block " + b.getBlockName();
- pendingBlocks[idx++] = storedBlock;
- }
-
- //
- // add blocks to the file
- //
- if (!dir.addBlocks(src, pendingBlocks)) {
- return OPERATION_FAILED;
- }
+ // The file is no longer pending.
+ // Create permanent INode, update blockmap
+ INodeFile newFile = pendingFile.convertToInodeFile();
+ dir.replaceNode(src, pendingFile, newFile);
- // The file is no longer pending
- pendingCreates.remove(src);
- NameNode.stateChangeLog.debug(
- "DIR* NameSystem.completeFile: " + src
- + " is removed from pendingCreates");
+ // persist block allocations for this file
+ dir.persistBlocks(src, newFile);
+
+ NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+ + " blocklist persisted");
synchronized (leases) {
Lease lease = getLease(holder);
@@ -1051,6 +1032,8 @@
// Now that the file is real, we need to be sure to replicate
// the blocks.
int numExpectedReplicas = pendingFile.getReplication();
+ Block[] pendingBlocks = pendingFile.getBlocks();
+ int nrBlocks = pendingBlocks.length;
for (int i = 0; i < nrBlocks; i++) {
// filter out containingNodes that are marked for decommission.
NumberReplicas number = countNodes(pendingBlocks[i]);
@@ -1069,15 +1052,14 @@
/**
* Allocate a block at the given pending filename
*/
- private Block allocateBlock(String src) throws IOException {
+ private Block allocateBlock(String src, INode file) throws IOException {
Block b = null;
do {
b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
} while (isValidBlock(b));
- pendingCreates.addBlock(src, b);
+ b = dir.addBlock(src, file, b);
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
- +src+ ". "+b.getBlockName()+
- " is created and added to pendingCreates and pendingCreateBlocks");
+ +src+ ". "+b.getBlockName());
return b;
}
@@ -1086,13 +1068,13 @@
* replicated. If not, return false. If checkall is true, then check
* all blocks, otherwise check only penultimate block.
*/
- synchronized boolean checkFileProgress(FileUnderConstruction v, boolean checkall) {
+ synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
if (checkall) {
//
// check all blocks of the file.
//
- for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext();) {
- if (blocksMap.numNodes(it.next()) < this.minReplication) {
+ for (Block block: v.getBlocks()) {
+ if (blocksMap.numNodes(block) < this.minReplication) {
return false;
}
}
@@ -1490,21 +1472,36 @@
}
/**
- * Release a pending file creation lock.
+ * Move a file that is being written to be immutable.
* @param src The filename
* @param holder The datanode that was creating the file
*/
private void internalReleaseCreate(String src, String holder) throws IOException {
- boolean status = pendingCreates.remove(src);
- if (status) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " + src
- + " is removed from pendingCreates for "
- + holder + " (failure)");
- } else {
- NameNode.stateChangeLog.warn("DIR* NameSystem.internalReleaseCreate: "
- + "attempt to release a create lock on "+ src
- + " that was not in pedingCreates");
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) dir.getFileINode(src);
+
+ // The last block that was allocated migth not have been used by the
+ // client. In this case, the size of the last block would be 0. A fsck
+ // will report this block as a missing block because no datanodes have it.
+ // Delete this block.
+ Block[] blocks = pendingFile.getBlocks();
+ if (blocks != null && blocks.length > 1) {
+ Block last = blocks[blocks.length - 1];
+ if (last.getNumBytes() == 0) {
+ pendingFile.removeBlock(last);
+ }
}
+
+ // The file is no longer pending.
+ // Create permanent INode, update blockmap
+ INodeFile newFile = pendingFile.convertToInodeFile();
+ dir.replaceNode(src, pendingFile, newFile);
+
+ // persist block allocations for this file
+ dir.persistBlocks(src, newFile);
+
+ NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " +
+ src + " is no longer written to by " +
+ holder);
}
/**
@@ -2161,7 +2158,18 @@
Block storedBlock = blocksMap.getStoredBlock(block); //extra look up!
if (storedBlock != null && block != storedBlock) {
if (block.getNumBytes() > 0) {
- storedBlock.setNumBytes(block.getNumBytes());
+ long cursize = storedBlock.getNumBytes();
+ if (cursize == 0) {
+ storedBlock.setNumBytes(block.getNumBytes());
+ } else if (cursize != block.getNumBytes()) {
+ LOG.warn("Inconsistent size for block " + block +
+ " reported from " + node.getName() +
+ " current size is " + cursize +
+ " reported size is " + block.getNumBytes());
+ // Accept this block even if there is a problem with its
+ // size. Clients should detect data corruption because of
+ // CRC mismatch.
+ }
}
block = storedBlock;
}
@@ -2185,8 +2193,13 @@
+ block.getBlockName() + " on " + node.getName());
}
- if (fileINode == null) // block does not belong to any file
+ //
+ // if file is being actively written to, then do not check
+ // replication-factor here. It will be checked when the file is closed.
+ //
+ if (fileINode == null || fileINode instanceof INodeFileUnderConstruction) {
return block;
+ }
// filter out containingNodes that are marked for decommission.
NumberReplicas num = countNodes(block);
@@ -3460,8 +3473,7 @@
* Returns whether the given block is one pointed-to by a file.
*/
private boolean isValidBlock(Block b) {
- return (blocksMap.getINode(b) != null ||
- pendingCreates.contains(b));
+ return (blocksMap.getINode(b) != null);
}
// Distributed upgrade manager
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/INode.java Wed Sep 19 15:12:49 2007
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
+import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
@@ -472,6 +473,14 @@
allocateBlocks(nrBlocks);
}
+ protected INodeFile(BlockInfo[] blklist, short replication, long modificationTime,
+ long preferredBlockSize) {
+ super(modificationTime);
+ this.blockReplication = replication;
+ this.preferredBlockSize = preferredBlockSize;
+ blocks = blklist;
+ }
+
boolean isDirectory() {
return false;
}
@@ -492,7 +501,7 @@
* Get file blocks
* @return file blocks
*/
- Block[] getBlocks() {
+ BlockInfo[] getBlocks() {
return this.blocks;
}
@@ -505,6 +514,45 @@
}
/**
+ * add a block to the block list
+ */
+ void addBlock(BlockInfo newblock) {
+ if (this.blocks == null) {
+ this.blocks = new BlockInfo[1];
+ this.blocks[0] = newblock;
+ } else {
+ int size = this.blocks.length;
+ BlockInfo[] newlist = new BlockInfo[size + 1];
+ for (int i = 0; i < size; i++) {
+ newlist[i] = this.blocks[i];
+ }
+ newlist[size] = newblock;
+ this.blocks = newlist;
+ }
+ }
+
+ /**
+ * remove a block from the block list. This block should be
+ * the last one on the list.
+ */
+ void removeBlock(Block oldblock) throws IOException {
+ if (this.blocks == null) {
+ throw new IOException("Trying to delete non-existant block " +
+ oldblock);
+ }
+ int size = this.blocks.length;
+ if (!this.blocks[size-1].equals(oldblock)) {
+ throw new IOException("Trying to delete non-existant block " +
+ oldblock);
+ }
+ BlockInfo[] newlist = new BlockInfo[size - 1];
+ for (int i = 0; i < size-1; i++) {
+ newlist[i] = this.blocks[i];
+ }
+ this.blocks = newlist;
+ }
+
+ /**
* Set file block
*/
void setBlock(int idx, BlockInfo blk) {
@@ -536,5 +584,58 @@
*/
long getPreferredBlockSize() {
return preferredBlockSize;
+ }
+
+ /**
+ * Return the penultimate allocated block for this file.
+ */
+ Block getPenultimateBlock() {
+ if (blocks == null || blocks.length <= 1) {
+ return null;
+ }
+ return blocks[blocks.length - 2];
+ }
+}
+
+class INodeFileUnderConstruction extends INodeFile {
+ protected StringBytesWritable clientName; // lease holder
+ protected StringBytesWritable clientMachine;
+ protected DatanodeDescriptor clientNode; // if client is a cluster node too.
+
+ INodeFileUnderConstruction(short replication,
+ long preferredBlockSize,
+ long modTime,
+ String clientName,
+ String clientMachine,
+ DatanodeDescriptor clientNode)
+ throws IOException {
+ super(0, replication, modTime, preferredBlockSize);
+ this.clientName = new StringBytesWritable(clientName);
+ this.clientMachine = new StringBytesWritable(clientMachine);
+ this.clientNode = clientNode;
+ }
+
+ String getClientName() throws IOException {
+ return clientName.getString();
+ }
+
+ String getClientMachine() throws IOException {
+ return clientMachine.getString();
+ }
+
+ DatanodeDescriptor getClientNode() {
+ return clientNode;
+ }
+
+ //
+ // converts a INodeFileUnderConstruction into a INodeFile
+ //
+ INodeFile convertToInodeFile() {
+ INodeFile obj = new INodeFile(getBlocks(),
+ getReplication(),
+ getModificationTime(),
+ getPreferredBlockSize());
+ return obj;
+
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Wed Sep 19 15:12:49 2007
@@ -44,6 +44,7 @@
modifFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
}
static final String SETREP_SHORT_USAGE="-setrep [-R] [-w] <rep> <path/file>";
+ static final String TAIL_USAGE="-tail [-f] <file>";
private static final DecimalFormat decimalFormat =
new DecimalFormat("#*0.0#*");
@@ -881,6 +882,54 @@
}
/**
+ * Parse the incoming command string
+ * @param cmd
+ * @param pos ignore anything before this pos in cmd
+ * @throws IOException
+ */
+ private void tail(String[] cmd, int pos) throws IOException {
+ CommandFormat c = new CommandFormat("tail", 1, 1, "f");
+ String src = null;
+ Path path = null;
+ short rep = 0;
+
+ try {
+ List<String> parameters = c.parse(cmd, pos);
+ src = parameters.get(0);
+ } catch(IllegalArgumentException iae) {
+ System.err.println("Usage: java FsShell " + TAIL_USAGE);
+ throw iae;
+ }
+ boolean foption = c.options.get("f") ? true: false;
+ path = new Path(src);
+
+ if (fs.isDirectory(path)) {
+ throw new IOException("Source must be a file.");
+ }
+
+ long fileSize = fs.getFileStatus(path).getLen();
+ long offset = (fileSize > 1024) ? fileSize - 1024: 0;
+
+ while (true) {
+ FSDataInputStream in = fs.open(path);
+ in.seek(offset);
+ IOUtils.copyBytes(in, System.out, 1024, false);
+ offset = in.getPos();
+ in.close();
+ if (!foption) {
+ break;
+ }
+ fileSize = fs.getFileStatus(path).getLen();
+ offset = (fileSize > offset) ? offset: fileSize;
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+
+ /**
* Return an abbreviated English-language desc of the byte length
*/
public static String byteDesc(long len) {
@@ -926,6 +975,7 @@
"[-copyToLocal <src><localdst>] [-moveToLocal <src> <localdst>]\n\t" +
"[-mkdir <path>] [-report] [" + SETREP_SHORT_USAGE + "]\n\t" +
"[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]\n\t" +
+ "[-tail [-f] <path>]\n\t" +
"[-help [cmd]]\n";
String conf ="-conf <configuration file>: Specify an application configuration file.";
@@ -1025,6 +1075,10 @@
"\t\tin the specified format. Format accepts filesize in blocks (%b), filename (%n),\n" +
"\t\tblock size (%o), replication (%r), modification date (%y, %Y)\n";
+ String tail = TAIL_USAGE
+ + ": Show the last 1KB of the file. \n"
+ + "\t\tThe -f option shows apended data as the file grows. \n";
+
String help = "-help [cmd]: \tDisplays help for given command or all commands if none\n" +
"\t\tis specified.\n";
@@ -1078,6 +1132,8 @@
System.out.println(test);
} else if ("stat".equals(cmd)) {
System.out.println(stat);
+ } else if ("tail".equals(cmd)) {
+ System.out.println(tail);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
@@ -1214,6 +1270,8 @@
} else if ("-stat".equals(cmd)) {
System.err.println("Usage: java FsShell" +
" [-stat [format] <path>]");
+ } else if ("-tail".equals(cmd)) {
+ System.err.println("Usage: java FsShell [" + TAIL_USAGE + "]");
} else {
System.err.println("Usage: java FsShell");
System.err.println(" [-ls <path>]");
@@ -1238,6 +1296,7 @@
System.err.println(" [-touchz <path>]");
System.err.println(" [-test -[ezd] <path>]");
System.err.println(" [-stat [format] <path>]");
+ System.err.println(" [" + TAIL_USAGE + "]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
@@ -1370,6 +1429,8 @@
} else {
printHelp("");
}
+ } else if ("-tail".equals(cmd)) {
+ tail(argv, i);
} else {
exitCode = -1;
System.err.println(cmd.substring(1) + ": Unknown command");
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java?rev=577456&r1=577455&r2=577456&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java Wed Sep 19 15:12:49 2007
@@ -27,14 +27,16 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
/**
- * This class tests the FileStatus API.
+ * This class tests that a file need not be closed before its
+ * data can be read by another client.
*/
public class TestFileCreation extends TestCase {
static final long seed = 0xDEADBEEFL;
static final int blockSize = 8192;
- static final int fileSize = 16384;
+ static final int fileSize = 2 * blockSize;
private static String TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"))
@@ -78,11 +80,27 @@
}
}
}
+ FSDataInputStream stm = fileSys.open(name);
+ byte[] expected = new byte[fileSize];
+ Random rand = new Random(seed);
+ rand.nextBytes(expected);
+ // do a sanity check. Read the file
+ byte[] actual = new byte[fileSize];
+ stm.readFully(0, actual);
+ checkData(actual, 0, expected, "Read 1");
}
+ private void checkData(byte[] actual, int from, byte[] expected, String message) {
+ for (int idx = 0; idx < actual.length; idx++) {
+ this.assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+ expected[from+idx]+" actual "+actual[idx],
+ actual[idx], expected[from+idx]);
+ actual[idx] = 0;
+ }
+ }
/**
- * Tests various options of File creation.
+ * Test that file data becomes available before file is closed.
*/
public void testFileCreation() throws IOException {
Configuration conf = new Configuration();
@@ -115,9 +133,13 @@
// write to file
writeFile(stm);
- // close file. This makes all file data visible to clients.
- stm.close();
+ // verify that file size has changed
+ assertTrue(file1 + " should be of size " + fileSize,
+ fs.getFileStatus(file1).getLen() == fileSize);
+
+ // Make sure a client can read it before it is closed.
checkFile(fs, file1, 1);
+ stm.close();
} finally {
fs.close();