You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/09/17 03:24:36 UTC

svn commit: r816022 [1/2] - in /hadoop/hdfs/branches/HDFS-265: ./ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/docs/src/documentation/resources/images/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/p...

Author: szetszwo
Date: Thu Sep 17 01:24:35 2009
New Revision: 816022

URL: http://svn.apache.org/viewvc?rev=816022&view=rev
Log:
Merge -r 814222:815964 from trunk.

Added:
    hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/content/xdocs/hdfsproxy.xml
      - copied unchanged from r815964, hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfsproxy.xml
    hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/resources/images/hdfsproxy-forward.jpg
      - copied unchanged from r815964, hadoop/hdfs/trunk/src/docs/src/documentation/resources/images/hdfsproxy-forward.jpg
    hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/resources/images/hdfsproxy-overview.jpg
      - copied unchanged from r815964, hadoop/hdfs/trunk/src/docs/src/documentation/resources/images/hdfsproxy-overview.jpg
    hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/resources/images/hdfsproxy-server.jpg
      - copied unchanged from r815964, hadoop/hdfs/trunk/src/docs/src/documentation/resources/images/hdfsproxy-server.jpg
    hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/resources/images/request-identify.jpg
      - copied unchanged from r815964, hadoop/hdfs/trunk/src/docs/src/documentation/resources/images/request-identify.jpg
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
      - copied unchanged from r815964, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
      - copied unchanged from r815964, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
      - copied unchanged from r815964, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
      - copied unchanged from r815964, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
Modified:
    hadoop/hdfs/branches/HDFS-265/   (props changed)
    hadoop/hdfs/branches/HDFS-265/CHANGES.txt
    hadoop/hdfs/branches/HDFS-265/build.xml   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/README
    hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/content/xdocs/site.xml
    hadoop/hdfs/branches/HDFS-265/src/java/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs-with-mr/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
    hadoop/hdfs/branches/HDFS-265/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-265/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/HDFS-265/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 17 01:24:35 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19/hdfs:713112
-/hadoop/hdfs/trunk:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630
+/hadoop/hdfs/trunk:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964

Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Thu Sep 17 01:24:35 2009
@@ -88,6 +88,9 @@
     HDFS-235. Add support for byte ranges in HftpFileSystem to serve
     range of bytes from a file. (Bill Zeller via suresh)
 
+    HDFS-385. Add support for an experimental API that allows a module external
+    to HDFS to specify how HDFS blocks should be placed. (dhruba)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file
@@ -200,6 +203,13 @@
     HDFS-412. Hadoop JMX usage makes Nagios monitoring impossible.
     (Brian Bockelman via tomwhite)
 
+    HDFS-472. Update hdfsproxy documentation. Adds a setup guide and design
+    document. (Zhiyong Zhang via cdouglas)
+
+    HDFS-617. Support non-recursive create().  (Kan Zhang via szetszwo)
+
+    HDFS-618. Support non-recursive mkdir().  (Kan Zhang via szetszwo)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 
@@ -286,6 +296,10 @@
     HDFS-614. TestDatanodeBlockScanner obtains data directories directly from
     MiniHDFSCluster. (shv)
 
+    HDFS-612. Remove the use of org.mortbay.log.Log in FSDataset.  (szetszwo)
+
+    HDFS-622. checkMinReplication should count live nodes only. (shv)
+
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS

Propchange: hadoop/hdfs/branches/HDFS-265/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 17 01:24:35 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/build.xml:713112
 /hadoop/core/trunk/build.xml:779102
-/hadoop/hdfs/trunk/build.xml:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630
+/hadoop/hdfs/trunk/build.xml:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964

Propchange: hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 17 01:24:35 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964

Modified: hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/README
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/README?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/README (original)
+++ hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy/README Thu Sep 17 01:24:35 2009
@@ -1,51 +1,47 @@
-HDFSPROXY is an HTTPS proxy server that exposes the same HSFTP interface as a 
-real cluster. It authenticates users via user certificates and enforce access 
-control based on configuration files.
-
-Starting up an HDFSPROXY server is similar to starting up an HDFS cluster. 
-Simply run "hdfsproxy" shell command. The main configuration file is 
-hdfsproxy-default.xml, which should be on the classpath. hdfsproxy-env.sh 
-can be used to set up environmental variables. In particular, JAVA_HOME should 
-be set. Additional configuration files include user-certs.xml, 
-user-permissions.xml and ssl-server.xml, which are used to specify allowed user
-certs, allowed directories/files, and ssl keystore information for the proxy, 
-respectively. The location of these files can be specified in 
-hdfsproxy-default.xml. Environmental variable HDFSPROXY_CONF_DIR can be used to
-point to the directory where these configuration files are located. The 
-configuration files of the proxied HDFS cluster should also be available on the
-classpath (hdfs-default.xml and hdfs-site.xml).
-
-Mirroring those used in HDFS, a few shell scripts are provided to start and 
-stop a group of proxy servers. The hosts to run hdfsproxy on are specified in 
-hdfsproxy-hosts file, one host per line. All hdfsproxy servers are stateless 
-and run independently from each other. Simple load balancing can be set up by 
-mapping all hdfsproxy server IP addresses to a single hostname. Users should 
-use that hostname to access the proxy. If an IP address look up for that 
-hostname returns more than one IP addresses, an HFTP/HSFTP client will randomly
-pick one to use.
-
-Command "hdfsproxy -reloadPermFiles" can be used to trigger reloading of 
-user-certs.xml and user-permissions.xml files on all proxy servers listed in 
-the hdfsproxy-hosts file. Similarly, "hdfsproxy -clearUgiCache" command can be 
-used to clear the UGI caches on all proxy servers.
-
-For tomcat based installation.
-1. set up the environment and configuration files. 
-	 a) export HADOOP_CONF_DIR=${user.home}/devel/source-conf
-	 	source-conf directory should point to the source cluster's configuration directory, 
-	 	where core-site.xml, and hdfs-site.xml should already be correctly configured for 
-	 	the source cluster settings.
-	 b) export HDFSPROXY_CONF_DIR=${user.home}/devel/proxy-conf
-	  proxy-conf directory should point to the proxy's configuration directory, where 
-	  hdfsproxy-default.xml, etc, should already be properly configured.
-
-2. cd ==> hdfsproxy directory,  ant war
-	 
-3. download and install tomcat6, change tomcat conf/server.xml file to include https support. 
-	 uncomment item below SSL HTTP/1.1 Connector and add paths, resulting something look like this:
-	 <Connector port="8443" protocol="HTTP/1.1" SSLEnabled="true"
-               maxThreads="150" scheme="https" secure="true" keystoreFile="${user.home}/grid/hdfsproxy-conf/server2.keystore" 
-               keystorePass="changeme" keystoreType="JKS"  clientAuth="true" sslProtocol="TLS" />
-4. copy war file in step 2 to tomcat's webapps directory and rename it to ROOT.war
-5. export JAVA_OPTS="-Djavax.net.ssl.trustStore=${user.home}/grid/hdfsproxy-conf/server2.keystore -Djavax.net.ssl.trustStorePassword=changeme"
-6. start up tomcat with tomcat's bin/startup.sh 
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+HDFS Proxy is a proxy server through which a hadoop client (through HSFTP) or a standard
+HTTPS client (wget, curl, etc) can talk to a hadoop server and more importantly pull data
+from the sever. It put an access control layer in front of hadoop namenode server and extends
+its functionalities to allow hadoop cross-version data transfer.
+
+HDFSPROXY can be configured/started via either Jetty or Tomcat with different supporting features.
+
+A) With Jetty-based Installation, supporting features include:
+> Single Hadoop source cluster data transfer
+> Single Hadoop version data transfer
+> Authenticate users via user SSL certificates with ProxyFilter installed
+> Enforce access control based on configuration files.
+
+B) With Tomcat-based Installation, supporting features include:
+> Multiple Hadoop source cluster data transfer
+> Multiple Hadoop version data transfer
+> Authenticate users via user SSL certificates with ProxyFilter installed
+> Authentication and authorization via LDAP with LdapIpDirFilter installed
+> Access control based on configuration files if ProxyFilter is installed.
+> Access control based on LDAP entries if LdapIpDirFilter is installed.
+> Standard HTTPS Get Support for file transfer
+
+The detailed configuration/set-up guide is in the Forrest 
+documentation, which can be found at $HADOOP_HDFS_HOME/docs. In order to build the 
+documentation on your own from source please use the following command in 
+the downloaded source folder:
+
+ant docs -Dforrest.home=path to forrest -Djava5.home= path to jdk5. 
+
+The documentation so built would be under $HADOOP_HDFS_HOME/build/docs

Modified: hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/content/xdocs/site.xml?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ hadoop/hdfs/branches/HDFS-265/src/docs/src/documentation/content/xdocs/site.xml Thu Sep 17 01:24:35 2009
@@ -45,6 +45,7 @@
 		<native_lib    				label="Native Libraries" 					href="native_libraries.html" />
 		<streaming 				label="Streaming"          				href="streaming.html" />
 		<fair_scheduler 			label="Fair Scheduler" 					href="fair_scheduler.html"/>
+        <hdfsproxy 			label="HDFS Proxy" 					href="hdfsproxy.html"/>
 		<cap_scheduler 		label="Capacity Scheduler" 			href="capacity_scheduler.html"/>
 		<SLA					 	label="Service Level Authorization" 	href="service_level_auth.html"/>
 		<vaidya    					label="Vaidya" 								href="vaidya.html"/>

Propchange: hadoop/hdfs/branches/HDFS-265/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 17 01:24:35 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
-/hadoop/hdfs/trunk/src/java:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630
+/hadoop/hdfs/trunk/src/java:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Sep 17 01:24:35 2009
@@ -523,6 +523,23 @@
   }
 
   /**
+   * Call
+   * {@link #create(String,FsPermission,EnumSet,boolean,short,long,Progressable,int)}
+   * with createParent set to true.
+   */
+  public OutputStream create(String src, 
+      FsPermission permission,
+      EnumSet<CreateFlag> flag, 
+      short replication,
+      long blockSize,
+      Progressable progress,
+      int buffersize
+      ) throws IOException {
+    return create(src, permission, flag, true,
+        replication, blockSize, progress, buffersize);
+  }
+
+  /**
    * Create a new dfs file with the specified block replication 
    * with write-progress reporting and return an output stream for writing
    * into the file.  
@@ -531,14 +548,16 @@
    * @param permission The permission of the directory being created.
    * If permission == null, use {@link FsPermission#getDefault()}.
    * @param flag do not check for file existence if true
+   * @param createParent create missing parent directory if true
    * @param replication block replication
    * @return output stream
    * @throws IOException
-   * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, short, long)
+   * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
    */
   public OutputStream create(String src, 
                              FsPermission permission,
                              EnumSet<CreateFlag> flag, 
+                             boolean createParent,
                              short replication,
                              long blockSize,
                              Progressable progress,
@@ -551,7 +570,7 @@
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
     OutputStream result = new DFSOutputStream(src, masked,
-        flag, replication, blockSize, progress, buffersize,
+        flag, createParent, replication, blockSize, progress, buffersize,
         conf.getInt("io.bytes.per.checksum", 512));
     leasechecker.put(src, result);
     return result;
@@ -951,8 +970,9 @@
 
   /**
    */
+  @Deprecated
   public boolean mkdirs(String src) throws IOException {
-    return mkdirs(src, null);
+    return mkdirs(src, null, true);
   }
 
   /**
@@ -962,10 +982,11 @@
    * @param src The path of the directory being created
    * @param permission The permission of the directory being created.
    * If permission == null, use {@link FsPermission#getDefault()}.
+   * @param createParent create missing parent directory if true
    * @return True if the operation success.
-   * @see ClientProtocol#mkdirs(String, FsPermission)
+   * @see ClientProtocol#mkdirs(String, FsPermission, boolean)
    */
-  public boolean mkdirs(String src, FsPermission permission)throws IOException{
+  public boolean mkdirs(String src, FsPermission permission, boolean createParent)throws IOException{
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
@@ -973,11 +994,12 @@
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
     try {
-      return namenode.mkdirs(src, masked);
+      return namenode.mkdirs(src, masked, createParent);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
+                                     FileNotFoundException.class,
                                      FileAlreadyExistsException.class);
     }
   }
@@ -3120,10 +3142,10 @@
 
     /**
      * Create a new output stream to the given DataNode.
-     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+     * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
      */
     DFSOutputStream(String src, FsPermission masked, EnumSet<CreateFlag> flag,
-        short replication, long blockSize, Progressable progress,
+        boolean createParent, short replication, long blockSize, Progressable progress,
         int buffersize, int bytesPerChecksum) throws IOException {
       this(src, blockSize, progress, bytesPerChecksum);
 
@@ -3131,9 +3153,11 @@
 
       try {
         namenode.create(
-            src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), replication, blockSize);
+            src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
       } catch(RemoteException re) {
         throw re.unwrapRemoteException(AccessControlException.class,
+                                       FileAlreadyExistsException.class,
+                                       FileNotFoundException.class,
                                        NSQuotaExceededException.class,
                                        DSQuotaExceededException.class);
       }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu Sep 17 01:24:35 2009
@@ -209,6 +209,18 @@
         statistics);
   }
 
+  /**
+   * Same as create(), except fails if parent directory doesn't already exist.
+   * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable)
+   */
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication,
+      long blockSize, Progressable progress) throws IOException {
+
+    return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
+        false, replication, blockSize, progress, bufferSize), statistics);
+  }
+
   @Override
   public boolean setReplication(Path src, 
                                 short replication
@@ -268,9 +280,17 @@
     return stats;
   }
 
+  /**
+   * Create a directory with given name and permission, only when
+   * parent directory exists.
+   */
+  public boolean mkdir(Path f, FsPermission permission) throws IOException {
+    return dfs.mkdirs(getPathName(f), permission, false);
+  }
+
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    return dfs.mkdirs(getPathName(f), permission);
+    return dfs.mkdirs(getPathName(f), permission, true);
   }
 
   /** {@inheritDoc} */

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Sep 17 01:24:35 2009
@@ -44,9 +44,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 47: added a new method getServerDefaults(), see HDFS-578
+   * 48: modified mkdirs() to take an additional boolean parameter
    */
-  public static final long versionID = 47L;
+  public static final long versionID = 48L;
   
   ///////////////////////////////////////
   // File contents
@@ -101,6 +101,7 @@
    * @param clientName name of the current client.
    * @param flag indicates whether the file should be 
    * overwritten if it already exists or create if it does not exist or append.
+   * @param createParent create missing parent directory if true
    * @param replication block replication factor.
    * @param blockSize maximum block size.
    * 
@@ -115,6 +116,7 @@
                      FsPermission masked,
                              String clientName, 
                              EnumSetWritable<CreateFlag> flag, 
+                             boolean createParent,
                              short replication,
                              long blockSize
                              ) throws IOException;
@@ -268,6 +270,7 @@
    *
    * @param src The path of the directory being created
    * @param masked The masked permission of the directory being created
+   * @param createParent create missing parent directory if true
    * @return True if the operation success.
    * @throws {@link AccessControlException} if permission to create file is 
    * denied by the system. As usually on the client side the exception will 
@@ -275,7 +278,8 @@
    * @throws QuotaExceededException if the operation would violate 
    *                                any quota restriction.
    */
-  public boolean mkdirs(String src, FsPermission masked) throws IOException;
+  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+      throws IOException;
 
   /**
    * Get a listing of the indicated directory

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Sep 17 01:24:35 2009
@@ -25,6 +25,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.Class;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -64,6 +65,9 @@
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.io.IOUtils;
@@ -772,18 +776,31 @@
       }
     }
   }
+
+  /* Check that this Balancer is compatible with the Block Placement Policy
+   * used by the Namenode.
+   */
+  private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+    if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
+        BlockPlacementPolicyDefault.class) {
+      throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+    }
+  }
   
   /** Default constructor */
-  Balancer() {
+  Balancer() throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(getConf());
   }
   
   /** Construct a balancer from the given configuration */
-  Balancer(Configuration conf) {
+  Balancer(Configuration conf) throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
   } 
 
   /** Construct a balancer from the given configuration and threshold */
-  Balancer(Configuration conf, double threshold) {
+  Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
     this.threshold = threshold;
   }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Sep 17 01:24:35 2009
@@ -56,8 +56,6 @@
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.io.IOUtils;
 
-import org.mortbay.log.Log;
-
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
  * has a unique name and an extent on disk.
@@ -689,10 +687,11 @@
           }
         }
         volumes = fsvs; // replace array of volumes
+        DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
+            + removed_vols.size() + " volumes. List of current volumes: "
+            + this);
       }
-      Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size + 
-          "volumes. List of current volumes: " +   toString());
-      
+
       return removed_vols;
     }
       
@@ -1608,7 +1607,7 @@
       }
     } // end of sync
     mlsec = System.currentTimeMillis() - mlsec;
-    DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
+    DataNode.LOG.warn("Removed " + removed_blocks + " out of " + total_blocks +
         "(took " + mlsec + " millisecs)");
 
     // report the error

Propchange: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 17 01:24:35 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:713112
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java:776175-785643,785929-786278
-/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:800619-803337,804756-805652,808672-809439,811495-813103,813105-813630
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Thu Sep 17 01:24:35 2009
@@ -120,7 +120,7 @@
   Random r = new Random();
 
   // for block replicas placement
-  ReplicationTargetChooser replicator;
+  BlockPlacementPolicy replicator;
 
   BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
     this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
@@ -137,8 +137,8 @@
   }
 
   void setConfigurationParameters(Configuration conf) throws IOException {
-    this.replicator = new ReplicationTargetChooser(
-                         conf.getBoolean("dfs.replication.considerLoad", true),
+    this.replicator = BlockPlacementPolicy.getInstance(
+                         conf,
                          namesystem,
                          namesystem.clusterMap);
 
@@ -236,7 +236,7 @@
    * @return true if the block has minimum replicas
    */
   boolean checkMinReplication(Block block) {
-    return (blocksMap.numNodes(block) >= minReplication);
+    return (countNodes(block).liveReplicas() >= minReplication);
   }
 
   /**
@@ -716,12 +716,13 @@
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes;
     DatanodeDescriptor srcNode;
+    INodeFile fileINode = null;
     int additionalReplRequired;
 
     synchronized (namesystem) {
       synchronized (neededReplications) {
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) {
           neededReplications.remove(block, priority); // remove from neededReplications
@@ -768,9 +769,11 @@
     }
 
     // choose replication targets: NOT HOLDING THE GLOBAL LOCK
+    // It is costly to extract the filename for which chooseTargets is called,
+    // so for now we pass in the Inode itself.
     DatanodeDescriptor targets[] = 
-                       replicator.chooseTarget(additionalReplRequired,
-                       srcNode, containingNodes, null, block.getNumBytes());
+                       replicator.chooseTarget(fileINode, additionalReplRequired,
+                       srcNode, containingNodes, block.getNumBytes());
     if(targets.length == 0)
       return false;
 
@@ -778,7 +781,7 @@
       synchronized (neededReplications) {
         // Recheck since global lock was released
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) {
           neededReplications.remove(block, priority); // remove from neededReplications
@@ -1245,7 +1248,7 @@
       }
     }
     namesystem.chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint);
+        addedNode, delNodeHint, replicator);
   }
 
   void addToExcessReplicate(DatanodeInfo dn, Block block) {

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Sep 17 01:24:35 2009
@@ -946,6 +946,23 @@
     }
     return fullPathName.toString();
   }
+
+  /** Return the full path name of the specified inode */
+  static String getFullPathName(INode inode) {
+    // calculate the depth of this inode from root
+    int depth = 0;
+    for (INode i = inode; i != null; i = i.parent) {
+      depth++;
+    }
+    INode[] inodes = new INode[depth];
+
+    // fill up the inodes in the path from this inode to root
+    for (int i = 0; i < depth; i++) {
+      inodes[depth-i-1] = inode;
+      inode = inode.parent;
+    }
+    return getFullPathName(inodes, depth-1);
+  }
   
   /**
    * Create a directory 

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Sep 17 01:24:35 2009
@@ -54,6 +54,7 @@
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
@@ -90,7 +91,7 @@
  * 4)  machine --> blocklist (inverted #2)
  * 5)  LRU cache of updated-heartbeat machines
  ***************************************************/
-public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterStats {
   public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
   public static final String AUDIT_FORMAT =
     "ugi=%s\t" +  // ugi
@@ -818,6 +819,24 @@
     return dir.getPreferredBlockSize(filename);
   }
 
+  /*
+   * Verify that parent dir exists
+   */
+  private void verifyParentDir(String src) throws FileAlreadyExistsException,
+      FileNotFoundException {
+    Path parent = new Path(src).getParent();
+    if (parent != null) {
+      INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
+      if (pathINodes[pathINodes.length - 1] == null) {
+        throw new FileNotFoundException("Parent directory doesn't exist: "
+            + parent.toString());
+      } else if (!pathINodes[pathINodes.length - 1].isDirectory()) {
+        throw new FileAlreadyExistsException("Parent path is not a directory: "
+            + parent.toString());
+      }
+    }
+  }
+
   /**
    * Create a new file entry in the namespace.
    * 
@@ -828,10 +847,11 @@
    */
   void startFile(String src, PermissionStatus permissions,
                  String holder, String clientMachine,
-                 EnumSet<CreateFlag> flag, short replication, long blockSize
+                 EnumSet<CreateFlag> flag, boolean createParent, 
+                 short replication, long blockSize
                 ) throws IOException {
     startFileInternal(src, permissions, holder, clientMachine, flag,
-        replication, blockSize);
+        createParent, replication, blockSize);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
       final FileStatus stat = dir.getFileInfo(src);
@@ -846,6 +866,7 @@
                                               String holder, 
                                               String clientMachine, 
                                               EnumSet<CreateFlag> flag,
+                                              boolean createParent,
                                               short replication,
                                               long blockSize
                                               ) throws IOException {
@@ -857,6 +878,7 @@
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
           + ", clientMachine=" + clientMachine
+          + ", createParent=" + createParent
           + ", replication=" + replication
           + ", overwrite=" + overwrite
           + ", append=" + append);
@@ -883,6 +905,10 @@
       }
     }
 
+    if (!createParent) {
+      verifyParentDir(src);
+    }
+
     try {
       INode myFile = dir.getFileINode(src);
       if (myFile != null && myFile.isUnderConstruction()) {
@@ -940,7 +966,7 @@
           else {
             //append & create a nonexist file equals to overwrite
             this.startFileInternal(src, permissions, holder, clientMachine,
-                EnumSet.of(CreateFlag.OVERWRITE), replication, blockSize);
+                EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
             return;
           }
         } else if (myFile.isDirectory()) {
@@ -1016,7 +1042,7 @@
                             " Please refer to dfs.support.append configuration parameter.");
     }
     startFileInternal(src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND), 
-                      (short)blockManager.maxReplication, (long)0);
+                      false, (short)blockManager.maxReplication, (long)0);
     getEditLog().logSync();
 
     //
@@ -1128,7 +1154,7 @@
 
     // choose targets for the new block to be allocated.
     DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
-        replication, clientNode, null, blockSize);
+        src, replication, clientNode, blockSize);
     if (targets.length < blockManager.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +
@@ -1511,9 +1537,9 @@
   /**
    * Create all the necessary directories
    */
-  public boolean mkdirs(String src, PermissionStatus permissions
-      ) throws IOException {
-    boolean status = mkdirsInternal(src, permissions);
+  public boolean mkdirs(String src, PermissionStatus permissions,
+      boolean createParent) throws IOException {
+    boolean status = mkdirsInternal(src, permissions, createParent);
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled()) {
       final FileStatus stat = dir.getFileInfo(src);
@@ -1528,7 +1554,7 @@
    * Create all the necessary directories
    */
   private synchronized boolean mkdirsInternal(String src,
-      PermissionStatus permissions) throws IOException {
+      PermissionStatus permissions, boolean createParent) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     if (isPermissionEnabled) {
       checkTraverse(src);
@@ -1547,6 +1573,10 @@
       checkAncestorAccess(src, FsAction.WRITE);
     }
 
+    if (!createParent) {
+      verifyParentDir(src);
+    }
+
     // validate that we have enough inodes. This is, at best, a 
     // heuristic because the mkdirs() operation migth need to 
     // create multiple inodes.
@@ -2372,8 +2402,10 @@
   void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
                               Block b, short replication,
                               DatanodeDescriptor addedNode,
-                              DatanodeDescriptor delNodeHint) {
+                              DatanodeDescriptor delNodeHint,
+                              BlockPlacementPolicy replicator) {
     // first form a rack to datanodes map and
+    INodeFile inode = blockManager.getINode(b);
     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
       new HashMap<String, ArrayList<DatanodeDescriptor>>();
     for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
@@ -2417,17 +2449,7 @@
             (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
           cur = delNodeHint;
       } else { // regular excessive replica removal
-        Iterator<DatanodeDescriptor> iter = 
-          priSet.isEmpty() ? remains.iterator() : priSet.iterator();
-          while( iter.hasNext() ) {
-            DatanodeDescriptor node = iter.next();
-            long free = node.getRemaining();
-
-            if (minSpace > free) {
-              minSpace = free;
-              cur = node;
-            }
-          }
+        cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
       }
 
       firstOne = false;

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Thu Sep 17 01:24:35 2009
@@ -33,7 +33,7 @@
  * This is a base INode class containing common fields for file and 
  * directory inodes.
  */
-abstract class INode implements Comparable<byte[]> {
+abstract class INode implements Comparable<byte[]>, FSInodeInfo {
   protected byte[] name;
   protected INodeDirectory parent;
   protected long modificationTime;
@@ -247,6 +247,12 @@
   }
 
   /** {@inheritDoc} */
+  public String getFullPathName() {
+    // Get the full path name of this inode.
+    return FSDirectory.getFullPathName(this);
+  }
+
+  /** {@inheritDoc} */
   public String toString() {
     return "\"" + getLocalName() + "\":" + getPermissionStatus();
   }

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Sep 17 01:24:35 2009
@@ -556,6 +556,7 @@
                      FsPermission masked,
                              String clientName, 
                              EnumSetWritable<CreateFlag> flag,
+                             boolean createParent,
                              short replication,
                              long blockSize
                              ) throws IOException {
@@ -571,7 +572,7 @@
     namesystem.startFile(src,
         new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
             null, masked),
-        clientName, clientMachine, flag.get(), replication, blockSize);
+        clientName, clientMachine, flag.get(), createParent, replication, blockSize);
     myMetrics.numFilesCreated.inc();
     myMetrics.numCreateFileOps.inc();
   }
@@ -730,7 +731,7 @@
   }
     
   /** {@inheritDoc} */
-  public boolean mkdirs(String src, FsPermission masked) throws IOException {
+  public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
     if (!checkPathLength(src)) {
       throw new IOException("mkdirs: Pathname too long.  Limit " 
@@ -738,7 +739,7 @@
     }
     return namesystem.mkdirs(src,
         new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
-            null, masked));
+            null, masked), createParent);
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Sep 17 01:24:35 2009
@@ -253,8 +253,8 @@
                     locs.length + " replica(s).");
       }
       // verify block placement policy
-      int missingRacks = ReplicationTargetChooser.verifyBlockPlacement(
-                    lBlk, targetFileReplication, networktopology);
+      int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology).
+                           verifyBlockPlacement(path, lBlk, Math.min(2,targetFileReplication));
       if (missingRacks > 0) {
         res.numMisReplicatedBlocks++;
         misReplicatedPerFile++;
@@ -335,7 +335,7 @@
     String target = lostFound + file.getPath();
     String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
     try {
-      if (!namenode.mkdirs(target, file.getPermission())) {
+      if (!namenode.mkdirs(target, file.getPermission(), true)) {
         LOG.warn(errmsg);
         return;
       }
@@ -501,7 +501,7 @@
       
       final FileStatus lfStatus = dfs.getFileInfo(lfName);
       if (lfStatus == null) { // not exists
-        lfInitedOk = dfs.mkdirs(lfName);
+        lfInitedOk = dfs.mkdirs(lfName, null, true);
         lostFound = lfName;
       } else if (!lfStatus.isDir()) { // exists but not a directory
         LOG.warn("Cannot use /lost+found : a regular file with this name exists.");

Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java Thu Sep 17 01:24:35 2009
@@ -1,514 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import java.util.*;
-
-/** The class is responsible for choosing the desired number of targets
- * for placing block replicas.
- * The replica placement strategy is that if the writer is on a datanode,
- * the 1st replica is placed on the local machine, 
- * otherwise a random datanode. The 2nd replica is placed on a datanode
- * that is on a different rack. The 3rd replica is placed on a datanode
- * which is on a different node of the rack as the second replica.
- */
-class ReplicationTargetChooser {
-  private final boolean considerLoad; 
-  private NetworkTopology clusterMap;
-  private FSNamesystem fs;
-    
-  ReplicationTargetChooser(boolean considerLoad,  FSNamesystem fs,
-                           NetworkTopology clusterMap) {
-    this.considerLoad = considerLoad;
-    this.fs = fs;
-    this.clusterMap = clusterMap;
-  }
-    
-  private static class NotEnoughReplicasException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    NotEnoughReplicasException(String msg) {
-      super(msg);
-    }
-  }
-    
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
-   * a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
-   * 
-   * @param numOfReplicas: number of replicas wanted.
-   * @param writer: the writer's machine, null if not in the cluster.
-   * @param excludedNodes: datanodes that should not be considered targets.
-   * @param blocksize: size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as targets
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    if (excludedNodes == null) {
-      excludedNodes = new HashMap<Node, Node>();
-    }
-      
-    return chooseTarget(numOfReplicas, writer, 
-                        new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
-  }
-    
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
-   * to re-replicate a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
-   * 
-   * @param numOfReplicas: additional number of replicas wanted.
-   * @param writer: the writer's machine, null if not in the cluster.
-   * @param choosenNodes: datanodes that have been chosen as targets.
-   * @param excludedNodes: datanodes that should not be considered targets.
-   * @param blocksize: size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as target 
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    List<DatanodeDescriptor> choosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-      return new DatanodeDescriptor[0];
-    }
-      
-    if (excludedNodes == null) {
-      excludedNodes = new HashMap<Node, Node>();
-    }
-      
-    int clusterSize = clusterMap.getNumOfLeaves();
-    int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
-    if (totalNumOfReplicas > clusterSize) {
-      numOfReplicas -= (totalNumOfReplicas-clusterSize);
-      totalNumOfReplicas = clusterSize;
-    }
-      
-    int maxNodesPerRack = 
-      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
-      
-    List<DatanodeDescriptor> results = 
-      new ArrayList<DatanodeDescriptor>(choosenNodes);
-    for (Node node:choosenNodes) {
-      excludedNodes.put(node, node);
-    }
-      
-    if (!clusterMap.contains(writer)) {
-      writer=null;
-    }
-      
-    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                excludedNodes, blocksize, maxNodesPerRack, results);
-      
-    results.removeAll(choosenNodes);
-      
-    // sorting nodes to form a pipeline
-    return getPipeline((writer==null)?localNode:writer,
-                       results.toArray(new DatanodeDescriptor[results.size()]));
-  }
-    
-  /* choose <i>numOfReplicas</i> from all data nodes */
-  private DatanodeDescriptor chooseTarget(int numOfReplicas,
-                                          DatanodeDescriptor writer,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) {
-      
-    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-      return writer;
-    }
-      
-    int numOfResults = results.size();
-    boolean newBlock = (numOfResults==0);
-    if (writer == null && !newBlock) {
-      writer = results.get(0);
-    }
-      
-    try {
-      if (numOfResults == 0) {
-        writer = chooseLocalNode(writer, excludedNodes, 
-                                 blocksize, maxNodesPerRack, results);
-        if (--numOfReplicas == 0) {
-          return writer;
-        }
-      }
-      if (numOfResults <= 1) {
-        chooseRemoteRack(1, results.get(0), excludedNodes, 
-                         blocksize, maxNodesPerRack, results);
-        if (--numOfReplicas == 0) {
-          return writer;
-        }
-      }
-      if (numOfResults <= 2) {
-        if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
-          chooseRemoteRack(1, results.get(0), excludedNodes,
-                           blocksize, maxNodesPerRack, results);
-        } else if (newBlock){
-          chooseLocalRack(results.get(1), excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
-        } else {
-          chooseLocalRack(writer, excludedNodes, blocksize,
-                          maxNodesPerRack, results);
-        }
-        if (--numOfReplicas == 0) {
-          return writer;
-        }
-      }
-      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
-                   blocksize, maxNodesPerRack, results);
-    } catch (NotEnoughReplicasException e) {
-      FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
-               + numOfReplicas);
-    }
-    return writer;
-  }
-    
-  /* choose <i>localMachine</i> as the target.
-   * if <i>localMachine</i> is not available, 
-   * choose a node on the same rack
-   * @return the chosen node
-   */
-  private DatanodeDescriptor chooseLocalNode(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-    // if no local machine, randomly choose one node
-    if (localMachine == null)
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
-      
-    // otherwise try local machine first
-    Node oldNode = excludedNodes.put(localMachine, localMachine);
-    if (oldNode == null) { // was not in the excluded list
-      if (isGoodTarget(localMachine, blocksize,
-                       maxNodesPerRack, false, results)) {
-        results.add(localMachine);
-        return localMachine;
-      }
-    } 
-      
-    // try a node on local rack
-    return chooseLocalRack(localMachine, excludedNodes, 
-                           blocksize, maxNodesPerRack, results);
-  }
-    
-  /* choose one node from the rack that <i>localMachine</i> is on.
-   * if no such node is available, choose one node from the rack where
-   * a second replica is on.
-   * if still no such node is available, choose a random node 
-   * in the cluster.
-   * @return the chosen node
-   */
-  private DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-    // no local machine, so choose a random machine
-    if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
-    }
-      
-    // choose one from the local rack
-    try {
-      return chooseRandom(
-                          localMachine.getNetworkLocation(),
-                          excludedNodes, blocksize, maxNodesPerRack, results);
-    } catch (NotEnoughReplicasException e1) {
-      // find the second replica
-      DatanodeDescriptor newLocal=null;
-      for(Iterator<DatanodeDescriptor> iter=results.iterator();
-          iter.hasNext();) {
-        DatanodeDescriptor nextNode = iter.next();
-        if (nextNode != localMachine) {
-          newLocal = nextNode;
-          break;
-        }
-      }
-      if (newLocal != null) {
-        try {
-          return chooseRandom(
-                              newLocal.getNetworkLocation(),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
-        } catch(NotEnoughReplicasException e2) {
-          //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
-        }
-      } else {
-        //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
-      }
-    }
-  }
-    
-  /* choose <i>numOfReplicas</i> nodes from the racks 
-   * that <i>localMachine</i> is NOT on.
-   * if not enough nodes are available, choose the remaining ones 
-   * from the local rack
-   */
-    
-  private void chooseRemoteRack(int numOfReplicas,
-                                DatanodeDescriptor localMachine,
-                                HashMap<Node, Node> excludedNodes,
-                                long blocksize,
-                                int maxReplicasPerRack,
-                                List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-    int oldNumOfReplicas = results.size();
-    // randomly choose one node from remote racks
-    try {
-      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
-                   excludedNodes, blocksize, maxReplicasPerRack, results);
-    } catch (NotEnoughReplicasException e) {
-      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
-                   localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                   maxReplicasPerRack, results);
-    }
-  }
-
-  /* Randomly choose one target from <i>nodes</i>.
-   * @return the chosen node
-   */
-  private DatanodeDescriptor chooseRandom(
-                                          String nodes,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) 
-    throws NotEnoughReplicasException {
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
-    while(numOfAvailableNodes > 0) {
-      DatanodeDescriptor choosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-
-      Node oldNode = excludedNodes.put(choosenNode, choosenNode);
-      if (oldNode == null) { // choosendNode was not in the excluded list
-        numOfAvailableNodes--;
-        if (isGoodTarget(choosenNode, blocksize, maxNodesPerRack, results)) {
-          results.add(choosenNode);
-          return choosenNode;
-        }
-      }
-    }
-
-    throw new NotEnoughReplicasException(
-        "Not able to place enough replicas");
-  }
-    
-  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
-   */
-  private void chooseRandom(int numOfReplicas,
-                            String nodes,
-                            HashMap<Node, Node> excludedNodes,
-                            long blocksize,
-                            int maxNodesPerRack,
-                            List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-      
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
-    while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
-      DatanodeDescriptor choosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-      Node oldNode = excludedNodes.put(choosenNode, choosenNode);
-      if (oldNode == null) {
-        numOfAvailableNodes--;
-
-        if (isGoodTarget(choosenNode, blocksize, maxNodesPerRack, results)) {
-          numOfReplicas--;
-          results.add(choosenNode);
-        }
-      }
-    }
-      
-    if (numOfReplicas>0) {
-      throw new NotEnoughReplicasException(
-                                           "Not able to place enough replicas");
-    }
-  }
-    
-  /* judge if a node is a good target.
-   * return true if <i>node</i> has enough space, 
-   * does not have too much load, and the rack does not have too many nodes
-   */
-  private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
-                               List<DatanodeDescriptor> results) {
-    return isGoodTarget(node, blockSize, maxTargetPerLoc,
-                        this.considerLoad, results);
-  }
-    
-  private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
-                               boolean considerLoad,
-                               List<DatanodeDescriptor> results) {
-    Log logr = FSNamesystem.LOG;
-    // check if the node is (being) decommissed
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      logr.debug("Node "+NodeBase.getPath(node)+
-                " is not chosen because the node is (being) decommissioned");
-      return false;
-    }
-
-    long remaining = node.getRemaining() - 
-                     (node.getBlocksScheduled() * blockSize); 
-    // check the remaining capacity of the target machine
-    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
-      logr.debug("Node "+NodeBase.getPath(node)+
-                " is not chosen because the node does not have enough space");
-      return false;
-    }
-      
-    // check the communication traffic of the target machine
-    if (considerLoad) {
-      double avgLoad = 0;
-      int size = clusterMap.getNumOfLeaves();
-      if (size != 0) {
-        avgLoad = (double)fs.getTotalLoad()/size;
-      }
-      if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        logr.debug("Node "+NodeBase.getPath(node)+
-                  " is not chosen because the node is too busy");
-        return false;
-      }
-    }
-      
-    // check if the target rack has chosen too many nodes
-    String rackname = node.getNetworkLocation();
-    int counter=1;
-    for(Iterator<DatanodeDescriptor> iter = results.iterator();
-        iter.hasNext();) {
-      Node result = iter.next();
-      if (rackname.equals(result.getNetworkLocation())) {
-        counter++;
-      }
-    }
-    if (counter>maxTargetPerLoc) {
-      logr.debug("Node "+NodeBase.getPath(node)+
-                " is not chosen because the rack has too many chosen nodes");
-      return false;
-    }
-    return true;
-  }
-    
-  /* Return a pipeline of nodes.
-   * The pipeline is formed finding a shortest path that 
-   * starts from the writer and traverses all <i>nodes</i>
-   * This is basically a traveling salesman problem.
-   */
-  private DatanodeDescriptor[] getPipeline(
-                                           DatanodeDescriptor writer,
-                                           DatanodeDescriptor[] nodes) {
-    if (nodes.length==0) return nodes;
-      
-    synchronized(clusterMap) {
-      int index=0;
-      if (writer == null || !clusterMap.contains(writer)) {
-        writer = nodes[0];
-      }
-      for(;index<nodes.length; index++) {
-        DatanodeDescriptor shortestNode = nodes[index];
-        int shortestDistance = clusterMap.getDistance(writer, shortestNode);
-        int shortestIndex = index;
-        for(int i=index+1; i<nodes.length; i++) {
-          DatanodeDescriptor currentNode = nodes[i];
-          int currentDistance = clusterMap.getDistance(writer, currentNode);
-          if (shortestDistance>currentDistance) {
-            shortestDistance = currentDistance;
-            shortestNode = currentNode;
-            shortestIndex = i;
-          }
-        }
-        //switch position index & shortestIndex
-        if (index != shortestIndex) {
-          nodes[shortestIndex] = nodes[index];
-          nodes[index] = shortestNode;
-        }
-        writer = shortestNode;
-      }
-    }
-    return nodes;
-  }
-
-  /**
-   * Verify that the block is replicated on at least 2 different racks
-   * if there is more than one rack in the system.
-   * 
-   * @param lBlk block with locations
-   * @param cluster 
-   * @return 1 if the block must be replicated on additional rack,
-   * or 0 if the number of racks is sufficient.
-   */
-  public static int verifyBlockPlacement(LocatedBlock lBlk,
-                                         short replication,
-                                         NetworkTopology cluster) {
-    int numRacks = verifyBlockPlacement(lBlk, Math.min(2,replication), cluster);
-    return numRacks < 0 ? 0 : numRacks;
-  }
-
-  /**
-   * Verify that the block is replicated on at least minRacks different racks
-   * if there is more than minRacks rack in the system.
-   * 
-   * @param lBlk block with locations
-   * @param minRacks number of racks the block should be replicated to
-   * @param cluster 
-   * @return the difference between the required and the actual number of racks
-   * the block is replicated to.
-   */
-  public static int verifyBlockPlacement(LocatedBlock lBlk,
-                                         int minRacks,
-                                         NetworkTopology cluster) {
-    DatanodeInfo[] locs = lBlk.getLocations();
-    if (locs == null)
-      locs = new DatanodeInfo[0];
-    int numRacks = cluster.getNumOfRacks();
-    if(numRacks <= 1) // only one rack
-      return 0;
-    minRacks = Math.min(minRacks, numRacks);
-    // 1. Check that all locations are different.
-    // 2. Count locations on different racks.
-    Set<String> racks = new TreeSet<String>();
-    for (DatanodeInfo dn : locs)
-      racks.add(dn.getNetworkLocation());
-    return minRacks - racks.size();
-  }
-} //end of Replicator
-

Propchange: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 17 01:24:35 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
 /hadoop/core/trunk/src/test/hdfs:776175-785643
-/hadoop/hdfs/trunk/src/test/hdfs:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630
+/hadoop/hdfs/trunk/src/test/hdfs:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964

Propchange: hadoop/hdfs/branches/HDFS-265/src/test/hdfs-with-mr/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 17 01:24:35 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs-with-mr:713112
 /hadoop/core/trunk/src/test/hdfs-with-mr:776175-784663
-/hadoop/hdfs/trunk/src/test/hdfs-with-mr:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630
+/hadoop/hdfs/trunk/src/test/hdfs-with-mr:796829-800617,800619-803337,804756-805652,808672-809439,811495-813103,813105-813630,814223-815964

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Thu Sep 17 01:24:35 2009
@@ -157,7 +157,7 @@
     
     public FsServerDefaults getServerDefaults() throws IOException { return null; }
     
-    public void create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, short replication, long blockSize) throws IOException {}
+    public void create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize) throws IOException {}
     
     public LocatedBlock append(String src, String clientName) throws IOException { return null; }
 
@@ -179,7 +179,7 @@
 
     public boolean delete(String src, boolean recursive) throws IOException { return false; }
 
-    public boolean mkdirs(String src, FsPermission masked) throws IOException { return false; }
+    public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { return false; }
 
     public FileStatus[] getListing(String src) throws IOException { return null; }
 

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java Thu Sep 17 01:24:35 2009
@@ -20,8 +20,10 @@
 import junit.framework.TestCase;
 import java.io.*;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 
 /**
@@ -73,4 +75,46 @@
       cluster.shutdown();
     }
   }
+  
+  /**
+   * Tests mkdir will not create directory when parent is missing.
+   */
+  public void testMkdir() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
+    try {
+      // Create a dir in root dir, should succeed
+      assertTrue(dfs.mkdir(new Path("/mkdir-" + System.currentTimeMillis()),
+          FsPermission.getDefault()));
+      // Create a dir when parent dir exists as a file, should fail
+      IOException expectedException = null;
+      String filePath = "/mkdir-file-" + System.currentTimeMillis();
+      writeFile(dfs, new Path(filePath));
+      try {
+        dfs.mkdir(new Path(filePath + "/mkdir"), FsPermission.getDefault());
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a directory when parent dir exists as file using"
+          + " mkdir() should throw FileAlreadyExistsException ",
+          expectedException != null
+              && expectedException instanceof FileAlreadyExistsException);
+      // Create a dir in a non-exist directory, should fail
+      expectedException = null;
+      try {
+        dfs.mkdir(new Path("/non-exist/mkdir-" + System.currentTimeMillis()),
+            FsPermission.getDefault());
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a directory in a non-exist parent dir using"
+          + " mkdir() should throw FileNotFoundException ",
+          expectedException != null
+              && expectedException instanceof FileNotFoundException);
+    } finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Thu Sep 17 01:24:35 2009
@@ -19,6 +19,7 @@
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -30,6 +31,7 @@
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -735,6 +737,99 @@
     }
   }
   
+  /**
+   * Test file creation using createNonRecursive().
+   */
+  public void testFileCreationNonRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    final Path path = new Path("/" + System.currentTimeMillis()
+        + "-testFileCreationNonRecursive");
+    FSDataOutputStream out = null;
+
+    try {
+      IOException expectedException = null;
+      final String nonExistDir = "/non-exist-" + System.currentTimeMillis();
+
+      fs.delete(new Path(nonExistDir), true);
+      EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
+      // Create a new file in root dir, should succeed
+      out = createNonRecursive(fs, path, 1, createFlag);
+      out.close();
+      // Create a file when parent dir exists as file, should fail
+      expectedException = null;
+      try {
+        createNonRecursive(fs, new Path(path, "Create"), 1, createFlag);
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a file when parent directory exists as a file"
+          + " should throw FileAlreadyExistsException ",
+          expectedException != null
+              && expectedException instanceof FileAlreadyExistsException);
+      fs.delete(path, true);
+      // Create a file in a non-exist directory, should fail
+      final Path path2 = new Path(nonExistDir + "/testCreateNonRecursive");
+      expectedException = null;
+      try {
+        createNonRecursive(fs, path2, 1, createFlag);
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a file in a non-exist dir using"
+          + " createNonRecursive() should throw FileNotFoundException ",
+          expectedException != null
+              && expectedException instanceof FileNotFoundException);
+
+      EnumSet<CreateFlag> overwriteFlag = EnumSet.of(CreateFlag.OVERWRITE);
+      // Overwrite a file in root dir, should succeed
+      out = createNonRecursive(fs, path, 1, overwriteFlag);
+      out.close();
+      // Overwrite a file when parent dir exists as file, should fail
+      expectedException = null;
+      try {
+        createNonRecursive(fs, new Path(path, "Overwrite"), 1, overwriteFlag);
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Overwrite a file when parent directory exists as a file"
+          + " should throw FileAlreadyExistsException ",
+          expectedException != null
+              && expectedException instanceof FileAlreadyExistsException);
+      fs.delete(path, true);
+      // Overwrite a file in a non-exist directory, should fail
+      final Path path3 = new Path(nonExistDir + "/testOverwriteNonRecursive");
+      expectedException = null;
+      try {
+        createNonRecursive(fs, path3, 1, overwriteFlag);
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Overwrite a file in a non-exist dir using"
+          + " createNonRecursive() should throw FileNotFoundException ",
+          expectedException != null
+              && expectedException instanceof FileNotFoundException);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  // creates a file using DistributedFileSystem.createNonRecursive()
+  static FSDataOutputStream createNonRecursive(FileSystem fs, Path name,
+      int repl, EnumSet<CreateFlag> flag) throws IOException {
+    System.out.println("createNonRecursive: Created " + name + " with " + repl
+        + " replica.");
+    FSDataOutputStream stm = ((DistributedFileSystem) fs).createNonRecursive(
+        name, FsPermission.getDefault(), flag, fs.getConf().getInt(
+            "io.file.buffer.size", 4096), (short) repl, (long) blockSize, null);
+    return stm;
+  }
+  
   // creates a file with the flag api
   static FSDataOutputStream createFileWithFlag(FileSystem fileSys, Path name, int repl, EnumSet<CreateFlag> flag)
     throws IOException {

Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=816022&r1=816021&r2=816022&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java Thu Sep 17 01:24:35 2009
@@ -516,7 +516,7 @@
       // dummyActionNoSynch(fileIdx);
       nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
                       clientName, new EnumSetWritable<CreateFlag>(EnumSet
-              .of(CreateFlag.OVERWRITE)), replication, BLOCK_SIZE);
+              .of(CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       for(boolean written = !closeUponCreate; !written; 
         written = nameNode.complete(fileNames[daemonId][inputIdx],
@@ -895,7 +895,7 @@
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName("ThroughputBench");
         nameNode.create(fileName, FsPermission.getDefault(), clientName,
-            new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.OVERWRITE)), replication,
+            new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.OVERWRITE)), true, replication,
             BLOCK_SIZE);
         Block lastBlock = addBlocks(fileName, clientName);
         nameNode.complete(fileName, clientName, lastBlock);