You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/15 06:29:47 UTC
svn commit: r1092584 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: suresh
Date: Fri Apr 15 04:29:46 2011
New Revision: 1092584
URL: http://svn.apache.org/viewvc?rev=1092584&view=rev
Log:
HDFS-1831. Fix append bug in FileContext and implement CreateFlag checks. Contributed by Suresh Srinivas.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1092584&r1=1092583&r2=1092584&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Apr 15 04:29:46 2011
@@ -184,6 +184,9 @@ Trunk (unreleased changes)
HDFS-1824. delay instantiation of file system object until it is
needed (linked to HADOOP-7207) (boryas)
+ HDFS-1831. Fix append bug in FileContext and implement CreateFlag
+ check (related to HADOOP-7223). (suresh)
+
Release 0.22.0 - Unreleased
NEW FEATURES
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1092584&r1=1092583&r2=1092584&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Apr 15 04:29:46 2011
@@ -573,8 +573,9 @@ public class DFSClient implements FSCons
int buffersize)
throws IOException {
return create(src, FsPermission.getDefault(),
- overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),
- replication, blockSize, progress, buffersize);
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+ buffersize);
}
/**
@@ -640,9 +641,29 @@ public class DFSClient implements FSCons
}
/**
+ * Append to an existing file if {@link CreateFlag#APPEND} is present
+ */
+ private OutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
+ int buffersize, Progressable progress) throws IOException {
+ if (flag.contains(CreateFlag.APPEND)) {
+ HdfsFileStatus stat = getFileInfo(src);
+ if (stat == null) { // No file to append to
+ // New file needs to be created if create option is present
+ if (!flag.contains(CreateFlag.CREATE)) {
+ throw new FileNotFoundException("failed to append to non-existent file "
+ + src + " on client " + clientName);
+ }
+ return null;
+ }
+ return callAppend(stat, src, buffersize, progress);
+ }
+ return null;
+ }
+
+ /**
* Same as {{@link #create(String, FsPermission, EnumSet, short, long,
* Progressable, int)} except that the permission
- * is absolute (ie has already been masked with umask.
+ * is absolute (ie has already been masked with umask.
*/
public OutputStream primitiveCreate(String src,
FsPermission absPermission,
@@ -655,9 +676,13 @@ public class DFSClient implements FSCons
int bytesPerChecksum)
throws IOException, UnresolvedLinkException {
checkOpen();
- OutputStream result = new DFSOutputStream(this, src, absPermission,
- flag, createParent, replication, blockSize, progress, buffersize,
- bytesPerChecksum);
+ CreateFlag.validate(flag);
+ OutputStream result = primitiveAppend(src, flag, buffersize, progress);
+ if (result == null) {
+ result = new DFSOutputStream(this, src, absPermission,
+ flag, createParent, replication, blockSize, progress, buffersize,
+ bytesPerChecksum);
+ }
leasechecker.put(src, result);
return result;
}
@@ -699,23 +724,11 @@ public class DFSClient implements FSCons
}
}
- /**
- * Append to an existing HDFS file.
- *
- * @param src file name
- * @param buffersize buffer size
- * @param progress for reporting write-progress
- * @return an output stream for writing into the file
- *
- * @see ClientProtocol#append(String, String)
- */
- OutputStream append(String src, int buffersize, Progressable progress)
- throws IOException {
- checkOpen();
- HdfsFileStatus stat = null;
+ /** Method to get stream returned by append call */
+ private OutputStream callAppend(HdfsFileStatus stat, String src,
+ int buffersize, Progressable progress) throws IOException {
LocatedBlock lastBlock = null;
try {
- stat = getFileInfo(src);
lastBlock = namenode.append(src, clientName);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
@@ -725,9 +738,26 @@ public class DFSClient implements FSCons
UnsupportedOperationException.class,
UnresolvedPathException.class);
}
- OutputStream result = new DFSOutputStream(this, src, buffersize, progress,
+ return new DFSOutputStream(this, src, buffersize, progress,
lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+ }
+
+ /**
+ * Append to an existing HDFS file.
+ *
+ * @param src file name
+ * @param buffersize buffer size
+ * @param progress for reporting write-progress
+ * @return an output stream for writing into the file
+ *
+ * @see ClientProtocol#append(String, String)
+ */
+ OutputStream append(String src, int buffersize, Progressable progress)
+ throws IOException {
+ checkOpen();
+ HdfsFileStatus stat = getFileInfo(src);
+ OutputStream result = callAppend(stat, src, buffersize, progress);
leasechecker.put(src, result);
return result;
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1092584&r1=1092583&r2=1092584&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Apr 15 04:29:46 2011
@@ -243,9 +243,9 @@ public class DistributedFileSystem exten
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
return new FSDataOutputStream(dfs.create(getPathName(f), permission,
- overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),
- replication, blockSize, progress, bufferSize),
- statistics);
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+ bufferSize), statistics);
}
@SuppressWarnings("deprecation")
@@ -267,6 +267,9 @@ public class DistributedFileSystem exten
EnumSet<CreateFlag> flag, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
+ if (flag.contains(CreateFlag.OVERWRITE)) {
+ flag.add(CreateFlag.CREATE);
+ }
return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
false, replication, blockSize, progress, bufferSize), statistics);
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1092584&r1=1092583&r2=1092584&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Apr 15 04:29:46 2011
@@ -1285,9 +1285,10 @@ class FSDirectory implements Closeable {
* Check whether the path specifies a directory
*/
boolean isDir(String src) throws UnresolvedLinkException {
+ src = normalizePath(src);
readLock();
try {
- INode node = rootDir.getNode(normalizePath(src), false);
+ INode node = rootDir.getNode(src, false);
return node != null && node.isDirectory();
} finally {
readUnlock();
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1092584&r1=1092583&r2=1092584&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Apr 15 04:29:46 2011
@@ -1315,22 +1315,16 @@ public class FSNamesystem implements FSC
long blockSize) throws SafeModeException, FileAlreadyExistsException,
AccessControlException, UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
- writeLock();
- try {
- boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
- boolean append = flag.contains(CreateFlag.APPEND);
- boolean create = flag.contains(CreateFlag.CREATE);
-
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
+ ", holder=" + holder
+ ", clientMachine=" + clientMachine
+ ", createParent=" + createParent
+ ", replication=" + replication
- + ", overwrite=" + overwrite
- + ", append=" + append);
+ + ", createFlag=" + flag.toString());
}
-
+ writeLock();
+ try {
if (isInSafeMode())
throw new SafeModeException("Cannot create file" + src, safeMode);
if (!DFSUtil.isValidName(src)) {
@@ -1340,14 +1334,16 @@ public class FSNamesystem implements FSC
// Verify that the destination does not exist as a directory already.
boolean pathExists = dir.exists(src);
if (pathExists && dir.isDir(src)) {
- throw new FileAlreadyExistsException("Cannot create file "+ src + "; already exists as a directory.");
+ throw new FileAlreadyExistsException("Cannot create file " + src
+ + "; already exists as a directory.");
}
+ boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+ boolean append = flag.contains(CreateFlag.APPEND);
if (isPermissionEnabled) {
if (append || (overwrite && pathExists)) {
checkPathAccess(src, FsAction.WRITE);
- }
- else {
+ } else {
checkAncestorAccess(src, FsAction.WRITE);
}
}
@@ -1420,34 +1416,27 @@ public class FSNamesystem implements FSC
} catch(IOException e) {
throw new IOException("failed to create "+e.getMessage());
}
- if (append) {
- if (myFile == null) {
- if(!create)
- throw new FileNotFoundException("failed to append to non-existent file "
- + src + " on client " + clientMachine);
- else {
- //append & create a nonexist file equals to overwrite
- return startFileInternal(src, permissions, holder, clientMachine,
- EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
- }
- } else if (myFile.isDirectory()) {
- throw new IOException("failed to append to directory " + src
- +" on client " + clientMachine);
+ boolean create = flag.contains(CreateFlag.CREATE);
+ if (myFile == null) {
+ if (!create) {
+ throw new FileNotFoundException("failed to overwrite or append to non-existent file "
+ + src + " on client " + clientMachine);
}
- } else if (!dir.isValidToCreate(src)) {
+ } else {
+ // File exists - must be one of append or overwrite
if (overwrite) {
delete(src, true);
- } else {
- throw new IOException("failed to create file " + src
- +" on client " + clientMachine
- +" either because the filename is invalid or the file exists");
+ } else if (!append) {
+ throw new FileAlreadyExistsException("failed to create file " + src
+ + " on client " + clientMachine
+ + " because the file exists");
}
}
DatanodeDescriptor clientNode =
host2DataNodeMap.getDatanodeByHost(clientMachine);
- if (append) {
+ if (append && myFile != null) {
//
// Replace current node with a INodeUnderConstruction.
// Recreate in-memory lease record.
@@ -2712,7 +2701,6 @@ public class FSNamesystem implements FSC
* Get registrationID for datanodes based on the namespaceID.
*
* @see #registerDatanode(DatanodeRegistration)
- * @see FSImage#newNamespaceID()
* @return registration ID
*/
public String getRegistrationID() {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1092584&r1=1092583&r2=1092584&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Fri Apr 15 04:29:46 2011
@@ -632,7 +632,8 @@ public class TestFileCreation extends ju
expectedException != null
&& expectedException instanceof FileNotFoundException);
- EnumSet<CreateFlag> overwriteFlag = EnumSet.of(CreateFlag.OVERWRITE);
+ EnumSet<CreateFlag> overwriteFlag =
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
// Overwrite a file in root dir, should succeed
out = createNonRecursive(fs, path, 1, overwriteFlag);
out.close();
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1092584&r1=1092583&r2=1092584&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Fri Apr 15 04:29:46 2011
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@@ -202,19 +203,12 @@ public class TestLeaseRecovery2 {
try {
dfs2.create(filepath, false, BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
fail("Creation of an existing file should never succeed.");
+ } catch (FileAlreadyExistsException ex) {
+ done = true;
+ } catch (AlreadyBeingCreatedException ex) {
+ AppendTestUtil.LOG.info("GOOD! got " + ex.getMessage());
} catch (IOException ioe) {
- final String message = ioe.getMessage();
- if (message.contains("file exists")) {
- AppendTestUtil.LOG.info("done", ioe);
- done = true;
- }
- else if (message.contains(
- AlreadyBeingCreatedException.class.getSimpleName())) {
- AppendTestUtil.LOG.info("GOOD! got " + message);
- }
- else {
- AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
- }
+ AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
}
if (!done) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1092584&r1=1092583&r2=1092584&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Fri Apr 15 04:29:46 2011
@@ -554,7 +554,7 @@ public class NNThroughputBenchmark {
// dummyActionNoSynch(fileIdx);
nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
clientName, new EnumSetWritable<CreateFlag>(EnumSet
- .of(CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
+ .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
long end = System.currentTimeMillis();
for(boolean written = !closeUponCreate; !written;
written = nameNode.complete(fileNames[daemonId][inputIdx],
@@ -967,7 +967,7 @@ public class NNThroughputBenchmark {
for(int idx=0; idx < nrFiles; idx++) {
String fileName = nameGenerator.getNextFileName("ThroughputBench");
nameNode.create(fileName, FsPermission.getDefault(), clientName,
- new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.OVERWRITE)), true, replication,
+ new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
BLOCK_SIZE);
Block lastBlock = addBlocks(fileName, clientName);
nameNode.complete(fileName, clientName, lastBlock);