You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC

svn commit: r1495297 [17/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...

Modified: hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/AggregateWordHistogram.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/AggregateWordHistogram.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/AggregateWordHistogram.java (original)
+++ hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/AggregateWordHistogram.java Fri Jun 21 06:37:27 2013
@@ -72,8 +72,9 @@ public class AggregateWordHistogram {
    */
   @SuppressWarnings("unchecked")
   public static void main(String[] args) throws IOException {
-    JobConf conf = ValueAggregatorJob.createValueAggregatorJob(args
-        , new Class[] {AggregateWordHistogramPlugin.class});
+    JobConf conf = ValueAggregatorJob.createValueAggregatorJob(args,
+        new Class[] { AggregateWordHistogramPlugin.class },
+        AggregateWordHistogram.class);
     
     JobClient.runJob(conf);
   }

Modified: hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/WordCount.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/WordCount.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
 package org.apache.hadoop.examples;
 
 import java.io.IOException;

Modified: hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java (original)
+++ hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java Fri Jun 21 06:37:27 2013
@@ -162,17 +162,36 @@ public class DistributedPentomino extend
     int height = 10;
     Class<? extends Pentomino> pentClass;
     if (args.length == 0) {
-      System.out.println("pentomino <output>");
+      System.out
+          .println("Usage: pentomino <output> [-depth #] [-height #] [-width #]");
       ToolRunner.printGenericCommandUsage(System.out);
       return -1;
     }
     
     conf = new JobConf(getConf());
+
+    // Pick up the parameters, should the user set these
     width = conf.getInt("pent.width", width);
     height = conf.getInt("pent.height", height);
     depth = conf.getInt("pent.depth", depth);
     pentClass = conf.getClass("pent.class", OneSidedPentomino.class, Pentomino.class);
     
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equalsIgnoreCase("-depth")) {
+        depth = Integer.parseInt(args[++i].trim());
+      } else if (args[i].equalsIgnoreCase("-height")) {
+        height = Integer.parseInt(args[++i].trim());
+      } else if (args[i].equalsIgnoreCase("-width")) {
+        width = Integer.parseInt(args[++i].trim());
+      }
+    }
+
+    // Set parameters for MR tasks to pick up either which way the user sets
+    // them or not
+    conf.setInt("pent.width", width);
+    conf.setInt("pent.height", height);
+    conf.setInt("pent.depth", depth);
+
     Path output = new Path(args[0]);
     Path input = new Path(output + "_input");
     FileSystem fileSys = FileSystem.get(conf);

Modified: hadoop/common/branches/branch-1-win/src/examples/pipes/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/examples/pipes/Makefile.am?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/examples/pipes/Makefile.am (original)
+++ hadoop/common/branches/branch-1-win/src/examples/pipes/Makefile.am Fri Jun 21 06:37:27 2013
@@ -17,7 +17,7 @@ ACLOCAL_AMFLAGS = -I ../../c++/utils/m4
 AM_CXXFLAGS=-Wall -I$(HADOOP_UTILS_PREFIX)/include \
             -I$(HADOOP_PIPES_PREFIX)/include
 LDADD=-L$(HADOOP_UTILS_PREFIX)/lib -L$(HADOOP_PIPES_PREFIX)/lib \
-      -lhadooppipes -lhadooputils
+      -lhadooppipes -lhadooputils -lcrypto
 
 bin_PROGRAMS= wordcount-simple wordcount-part wordcount-nopipe pipes-sort
 

Modified: hadoop/common/branches/branch-1-win/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/hdfs-default.xml?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/hdfs-default.xml Fri Jun 21 06:37:27 2013
@@ -38,7 +38,7 @@ creations/deletions), or "all".</descrip
   <name>dfs.datanode.address</name>
   <value>0.0.0.0:50010</value>
   <description>
-    The address where the datanode server will listen to.
+    The datanode server address and port for data transfer.
     If the port is 0 then the server will start on a free port.
   </description>
 </property>
@@ -333,9 +333,24 @@ creations/deletions), or "all".</descrip
   <description>
     Specifies the percentage of blocks that should satisfy 
     the minimal replication requirement defined by dfs.replication.min.
-    Values less than or equal to 0 mean not to start in safe mode.
+    Values less than or equal to 0 mean not to wait for any particular
+    percentage of blocks before exiting safemode.
     Values greater than 1 will make safe mode permanent.
   </description>
+ </property>
+ 
+<property>
+  <name>dfs.namenode.safemode.min.datanodes</name>
+  <value>0</value>
+  <description>
+    Specifies the number of datanodes that must be considered alive
+    before the name node exits safemode.
+    Values less than or equal to 0 mean not to take the number of live
+    datanodes into account when deciding whether to remain in safe mode
+    during startup.
+    Values greater than the number of datanodes in the cluster
+    will make safe mode permanent.
+  </description>
 </property>
 
 <property>
@@ -416,10 +431,10 @@ creations/deletions), or "all".</descrip
 
 <property>
   <name>dfs.support.append</name>
-  <value>false</value>
-  <description>Does HDFS allow appends to files?
-               This is currently set to false because there are bugs in the
-               "append code" and is not supported in any prodction cluster.
+  <description>
+    This option is no longer supported. HBase no longer requires that
+    this option be enabled as sync is now enabled by default. See
+    HADOOP-8230 for additional information.
   </description>
 </property>
 
@@ -464,6 +479,234 @@ creations/deletions), or "all".</descrip
 </property>
 
 <property>
+  <name>dfs.datanode.readahead.bytes</name>
+  <value>4193404</value>
+  <description>
+        While reading block files, if the Hadoop native libraries are available,
+        the datanode can use the posix_fadvise system call to explicitly
+        page data into the operating system buffer cache ahead of the current
+        reader's position. This can improve performance especially when
+        disks are highly contended.
+
+        This configuration specifies the number of bytes ahead of the current
+        read position which the datanode will attempt to read ahead. This
+        feature may be disabled by configuring this property to 0.
+
+        If the native libraries are not available, this configuration has no
+        effect.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.drop.cache.behind.reads</name>
+  <value>false</value>
+  <description>
+        In some workloads, the data read from HDFS is known to be significantly
+        large enough that it is unlikely to be useful to cache it in the
+        operating system buffer cache. In this case, the DataNode may be
+        configured to automatically purge all data from the buffer cache
+        after it is delivered to the client. This behavior is automatically
+        disabled for workloads which read only short sections of a block
+        (e.g HBase random-IO workloads).
+
+        This may improve performance for some workloads by freeing buffer
+        cache spage usage for more cacheable data.
+
+        If the Hadoop native libraries are not available, this configuration
+        has no effect.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.drop.cache.behind.writes</name>
+  <value>false</value>
+  <description>
+        In some workloads, the data written to HDFS is known to be significantly
+        large enough that it is unlikely to be useful to cache it in the
+        operating system buffer cache. In this case, the DataNode may be
+        configured to automatically purge all data from the buffer cache
+        after it is written to disk.
+
+        This may improve performance for some workloads by freeing buffer
+        cache spage usage for more cacheable data.
+
+        If the Hadoop native libraries are not available, this configuration
+        has no effect.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.sync.behind.writes</name>
+  <value>false</value>
+  <description>
+        If this configuration is enabled, the datanode will instruct the
+        operating system to enqueue all written data to the disk immediately
+        after it is written. This differs from the usual OS policy which
+        may wait for up to 30 seconds before triggering writeback.
+
+        This may improve performance for some workloads by smoothing the
+        IO profile for data written to disk.
+
+        If the Hadoop native libraries are not available, this configuration
+        has no effect.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.use.datanode.hostname</name>
+  <value>false</value>
+  <description>Whether clients should use datanode hostnames when
+    connecting to datanodes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.use.datanode.hostname</name>
+  <value>false</value>
+  <description>Whether datanodes should use datanode hostnames when
+    connecting to other datanodes for data transfer.
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.local.interfaces</name>
+  <value></value>
+  <description>A comma separated list of network interface names to use
+    for data transfer between the client and datanodes. When creating
+    a connection to read from or write to a datanode, the client
+    chooses one of the specified interfaces at random and binds its
+    socket to the IP of that interface. Individual names may be
+    specified as either an interface name (eg "eth0"), a subinterface
+    name (eg "eth0:0"), or an IP address (which may be specified using
+    CIDR notation to match a range of IPs).
+  </description>
+</property>
+
+<property>
+  <name>dfs.image.transfer.bandwidthPerSec</name>
+  <value>0</value>
+  <description>
+    Specifies the maximum amount of bandwidth that can be utilized
+    for image transfer in term of the number of bytes per second.
+    A default value of 0 indicates that throttling is disabled.
+  </description>
+</property>
+
+<property>
+  <name>dfs.webhdfs.enabled</name>
+  <value>false</value>
+  <description>
+    Enable WebHDFS (REST API) in Namenodes and Datanodes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.kerberos.internal.spnego.principal</name>
+  <value>${dfs.web.authentication.kerberos.principal}</value>
+</property>
+
+<property>
+  <name>dfs.secondary.namenode.kerberos.internal.spnego.principal</name>
+  <value>${dfs.web.authentication.kerberos.principal}</value>
+</property>
+
+<property>
+  <name>dfs.namenode.invalidate.work.pct.per.iteration</name>
+  <value>0.32f</value>
+  <description>
+    *Note*: Advanced property. Change with caution.
+    This determines the percentage amount of block
+    invalidations (deletes) to do over a single DN heartbeat
+    deletion command. The final deletion count is determined by applying this
+    percentage to the number of live nodes in the system.
+    The resultant number is the number of blocks from the deletion list
+    chosen for proper invalidation over a single heartbeat of a single DN.
+    Value should be a positive, non-zero percentage in float notation (X.Yf),
+    with 1.0f meaning 100%.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
+  <value>2</value>
+  <description>
+    *Note*: Advanced property. Change with caution.
+    This determines the total amount of block transfers to begin in
+    parallel at a DN, for replication, when such a command list is being
+    sent over a DN heartbeat by the NN. The actual number is obtained by
+    multiplying this multiplier with the total number of live nodes in the
+    cluster. The result number is the number of blocks to begin transfers
+    immediately for, per DN heartbeat. This number can be any positive,
+    non-zero integer.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.avoid.read.stale.datanode</name>
+  <value>false</value>
+  <description>
+    Indicate whether or not to avoid reading from &quot;stale&quot; datanodes whose
+    heartbeat messages have not been received by the namenode 
+    for more than a specified time interval. Stale datanodes will be
+    moved to the end of the node list returned for reading. See
+    dfs.namenode.avoid.write.stale.datanode for a similar setting for writes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.avoid.write.stale.datanode</name>
+  <value>false</value>
+  <description>
+    Indicate whether or not to avoid writing to &quot;stale&quot; datanodes whose 
+    heartbeat messages have not been received by the namenode 
+    for more than a specified time interval. Writes will avoid using 
+    stale datanodes unless more than a configured ratio 
+    (dfs.namenode.write.stale.datanode.ratio) of datanodes are marked as 
+    stale. See dfs.namenode.avoid.read.stale.datanode for a similar setting
+    for reads.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.stale.datanode.interval</name>
+  <value>30000</value>
+  <description>
+    Default time interval for marking a datanode as "stale", i.e., if 
+    the namenode has not received heartbeat msg from a datanode for 
+    more than this time interval, the datanode will be marked and treated 
+    as "stale" by default. The stale interval cannot be too small since 
+    otherwise this may cause too frequent change of stale states. 
+    We thus set a minimum stale interval value (the default value is 3 times 
+    of heartbeat interval) and guarantee that the stale interval cannot be less
+    than the minimum value.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.write.stale.datanode.ratio</name>
+  <value>0.5f</value>
+  <description>
+    When the ratio of number stale datanodes to total datanodes marked
+    is greater than this ratio, stop avoiding writing to stale nodes so
+   as to prevent causing hotspots.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.plugins</name>
+  <value></value>
+  <description>Comma-separated list of datanode plug-ins to be activated.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.plugins</name>
+  <value></value>
+  <description>Comma-separated list of namenode plug-ins to be activated.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.edits.noeditlogchannelflush</name>
   <value>false</value>
   <description>

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java Fri Jun 21 06:37:27 2013
@@ -84,11 +84,11 @@ class BlockReaderLocal extends FSInputCh
     }
 
     private synchronized ClientDatanodeProtocol getDatanodeProxy(
-        DatanodeInfo node, Configuration conf, int socketTimeout)
-        throws IOException {
+        DatanodeInfo node, Configuration conf, int socketTimeout,
+        boolean connectToDnViaHostname) throws IOException {
       if (proxy == null) {
         proxy = DFSClient.createClientDatanodeProtocolProxy(node, conf,
-            socketTimeout);
+            socketTimeout, connectToDnViaHostname);
       }
       return proxy;
     }
@@ -135,13 +135,14 @@ class BlockReaderLocal extends FSInputCh
    */
   static BlockReaderLocal newBlockReader(Configuration conf,
     String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, 
-    int socketTimeout, long startOffset, long length) throws IOException {
+    int socketTimeout, long startOffset, long length, boolean connectToDnViaHostname)
+    throws IOException {
     
     LocalDatanodeInfo localDatanodeInfo =  getLocalDatanodeInfo(node.getIpcPort());
     // check the cache first
     BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
     if (pathinfo == null) {
-      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token, connectToDnViaHostname);
     }
 
     // check to see if the file exists. It may so happen that the
@@ -216,11 +217,12 @@ class BlockReaderLocal extends FSInputCh
   
   private static BlockLocalPathInfo getBlockPathInfo(Block blk,
       DatanodeInfo node, Configuration conf, int timeout,
-      Token<BlockTokenIdentifier> token) throws IOException {
+      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname) 
+      throws IOException {
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
     BlockLocalPathInfo pathinfo = null;
     ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
-        conf, timeout);
+        conf, timeout, connectToDnViaHostname);
     try {
       // make RPC to local datanode to find local pathnames of blocks
       pathinfo = proxy.getBlockLocalPathInfo(blk, token);

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java Fri Jun 21 06:37:27 2013
@@ -22,7 +22,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
 
+import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 
@@ -54,20 +58,20 @@ public abstract class ByteRangeInputStre
       return url;
     }
 
-    protected abstract HttpURLConnection openConnection() throws IOException;
-
-    protected abstract HttpURLConnection openConnection(final long offset) throws IOException;
+    /** Connect to server with a data offset. */
+    protected abstract HttpURLConnection connect(final long offset,
+        final boolean resolved) throws IOException;
   }
 
   enum StreamStatus {
-    NORMAL, SEEK
+    NORMAL, SEEK, CLOSED
   }
   protected InputStream in;
   protected URLOpener originalURL;
   protected URLOpener resolvedURL;
   protected long startPos = 0;
   protected long currentPos = 0;
-  protected long filelength;
+  protected Long fileLength = null;
 
   StreamStatus status = StreamStatus.SEEK;
 
@@ -82,72 +86,123 @@ public abstract class ByteRangeInputStre
     this.resolvedURL = r;
   }
   
-  protected abstract void checkResponseCode(final HttpURLConnection connection
-      ) throws IOException;
-  
   protected abstract URL getResolvedUrl(final HttpURLConnection connection
       ) throws IOException;
 
-  private InputStream getInputStream() throws IOException {
-    if (status != StreamStatus.NORMAL) {
-      
-      if (in != null) {
-        in.close();
-        in = null;
-      }
-      
-      // Use the original url if no resolved url exists, eg. if
-      // it's the first time a request is made.
-      final URLOpener opener =
-        (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
-
-      final HttpURLConnection connection = opener.openConnection(startPos);
-      connection.connect();
-      checkResponseCode(connection);
-
+  protected InputStream getInputStream() throws IOException {
+    switch (status) {
+      case NORMAL:
+        break;
+      case SEEK:
+        if (in != null) {
+          in.close();
+        }
+        in = openInputStream();
+        status = StreamStatus.NORMAL;
+        break;
+      case CLOSED:
+        throw new IOException("Stream closed");
+    }
+    return in;
+  }
+  
+  protected InputStream openInputStream() throws IOException {
+    // Use the original url if no resolved url exists, eg. if
+    // it's the first time a request is made.
+    final boolean resolved = resolvedURL.getURL() != null; 
+    final URLOpener opener = resolved? resolvedURL: originalURL;
+
+    final HttpURLConnection connection = opener.connect(startPos, resolved);
+    resolvedURL.setURL(getResolvedUrl(connection));
+
+    InputStream in = connection.getInputStream();
+    final Map<String, List<String>> headers = connection.getHeaderFields();
+    if (isChunkedTransferEncoding(headers)) {
+      // file length is not known
+      fileLength = null;
+    } else {
+      // for non-chunked transfer-encoding, get content-length
       final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
-      filelength = (cl == null) ? -1 : Long.parseLong(cl);
-      in = connection.getInputStream();
+      if (cl == null) {
+        throw new IOException(StreamFile.CONTENT_LENGTH + " is missing: "
+            + headers);
+      }
+      final long streamlength = Long.parseLong(cl);
+      fileLength = startPos + streamlength;
 
-      resolvedURL.setURL(getResolvedUrl(connection));
-      status = StreamStatus.NORMAL;
+      // Java has a bug with >2GB request streams.  It won't bounds check
+      // the reads so the transfer blocks until the server times out
+      in = new BoundedInputStream(in, streamlength);
     }
-    
+
     return in;
   }
   
-  private void update(final boolean isEOF, final int n)
-      throws IOException {
-    if (!isEOF) {
+  private static boolean isChunkedTransferEncoding(
+      final Map<String, List<String>> headers) {
+    return contains(headers, "Transfer-Encoding", "chunked")
+        || contains(headers, "TE", "chunked");
+  }
+
+  /** Does the HTTP header map contain the given key, value pair? */
+  private static boolean contains(final Map<String, List<String>> headers,
+      final String key, final String value) {
+    final List<String> values = headers.get(key);
+    if (values != null) {
+      for(String v : values) {
+        for(final StringTokenizer t = new StringTokenizer(v, ",");
+            t.hasMoreTokens(); ) {
+          if (value.equalsIgnoreCase(t.nextToken())) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+  
+  private int update(final int n) throws IOException {
+    if (n != -1) {
       currentPos += n;
-    } else if (currentPos < filelength) {
+    } else if (fileLength != null && currentPos < fileLength) {
       throw new IOException("Got EOF but currentPos = " + currentPos
-          + " < filelength = " + filelength);
+          + " < filelength = " + fileLength);
     }
+    return n;
   }
 
+  @Override
   public int read() throws IOException {
     final int b = getInputStream().read();
-    update(b == -1, 1);
+    update((b == -1)? -1 : 1);
     return b;
   }
   
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException{
+    return update(getInputStream().read(b, off, len));
+  }
+  
   /**
    * Seek to the given offset from the start of the file.
    * The next read() will be from that location.  Can't
    * seek past the end of the file.
    */
+  @Override
   public void seek(long pos) throws IOException {
     if (pos != currentPos) {
       startPos = pos;
       currentPos = pos;
-      status = StreamStatus.SEEK;
+      if (status != StreamStatus.CLOSED) {
+        status = StreamStatus.SEEK;
+      }
     }
   }
 
   /**
    * Return the current offset from the start of the file
    */
+  @Override
   public long getPos() throws IOException {
     return currentPos;
   }
@@ -156,7 +211,17 @@ public abstract class ByteRangeInputStre
    * Seeks a different copy of the data.  Returns true if
    * found a new source, false otherwise.
    */
+  @Override
   public boolean seekToNewSource(long targetPos) throws IOException {
     return false;
   }
-}
\ No newline at end of file
+  
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+      in = null;
+    }
+    status = StreamStatus.CLOSED;
+  }
+}

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Jun 21 06:37:27 2013
@@ -21,10 +21,12 @@ import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.*;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -59,6 +62,8 @@ import java.nio.ByteBuffer;
 
 import javax.net.SocketFactory;
 
+import sun.net.util.IPAddressUtil;
+
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
  * perform basic file tasks.  It uses the ClientProtocol
@@ -75,13 +80,12 @@ public class DFSClient implements FSCons
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
   public final ClientProtocol namenode;
-  private final ClientProtocol rpcNamenode;
+  final ClientProtocol rpcNamenode;
   private final InetSocketAddress nnAddress;
   final UserGroupInformation ugi;
   volatile boolean clientRunning = true;
-  Random r = new Random();
+  static Random r = new Random();
   final String clientName;
-  final LeaseChecker leasechecker = new LeaseChecker();
   private Configuration conf;
   private long defaultBlockSize;
   private short defaultReplication;
@@ -93,6 +97,8 @@ public class DFSClient implements FSCons
   private final FileSystem.Statistics stats;
   private int maxBlockAcquireFailures;
   private boolean shortCircuitLocalReads;
+  private boolean connectToDnViaHostname;
+  private SocketAddress[] localInterfaceAddrs;
 
   /**
    * We assume we're talking to another CDH server, which supports
@@ -101,7 +107,18 @@ public class DFSClient implements FSCons
    */
   private volatile boolean serverSupportsHdfs630 = true;
   private volatile boolean serverSupportsHdfs200 = true;
- 
+  final int hdfsTimeout;    // timeout value for a DFS operation.
+  private final String authority;
+
+  /**
+   * A map from file names to {@link DFSOutputStream} objects
+   * that are currently being written by this client.
+   * Note that a file can only be written by a single client.
+   */
+  private final Map<String, DFSOutputStream> filesBeingWritten
+      = new HashMap<String, DFSOutputStream>();
+
+  /** Create a {@link NameNode} proxy */ 
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
   }
@@ -109,7 +126,7 @@ public class DFSClient implements FSCons
   public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
       Configuration conf) throws IOException {
     return createNamenode(createRPCNamenode(nameNodeAddr, conf,
-      UserGroupInformation.getCurrentUser()));
+      UserGroupInformation.getCurrentUser()), conf);
   }
 
   private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
@@ -117,11 +134,32 @@ public class DFSClient implements FSCons
     throws IOException {
     return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
         ClientProtocol.versionID, nameNodeAddr, ugi, conf,
-        NetUtils.getSocketFactory(conf, ClientProtocol.class));
-  }
+        NetUtils.getSocketFactory(conf, ClientProtocol.class), 0,
+        RetryUtils.getMultipleLinearRandomRetry(
+                conf, 
+                DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+                DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+                DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+                DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT
+                ),
+        false);  
+    }
 
-  private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
-    throws IOException {
+  private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
+      Configuration conf) throws IOException {
+    //default policy
+    @SuppressWarnings("unchecked")
+    final RetryPolicy defaultPolicy = 
+        RetryUtils.getDefaultRetryPolicy(
+            conf, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            SafeModeException.class
+            );
+    
+    //create policy
     RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
         5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
     
@@ -133,23 +171,27 @@ public class DFSClient implements FSCons
       new HashMap<Class<? extends Exception>, RetryPolicy>();
     exceptionToPolicyMap.put(RemoteException.class, 
         RetryPolicies.retryByRemoteException(
-            RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
+            defaultPolicy, remoteExceptionToPolicyMap));
     RetryPolicy methodPolicy = RetryPolicies.retryByException(
-        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+        defaultPolicy, exceptionToPolicyMap);
     Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
     
     methodNameToPolicyMap.put("create", methodPolicy);
 
-    return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
-        rpcNamenode, methodNameToPolicyMap);
+    final ClientProtocol cp = (ClientProtocol) RetryProxy.create(ClientProtocol.class,
+        rpcNamenode, defaultPolicy, methodNameToPolicyMap);
+    RPC.checkVersion(ClientProtocol.class, ClientProtocol.versionID, cp);
+    return cp;
   }
 
   /** Create {@link ClientDatanodeProtocol} proxy with block/token */
   static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
-      DatanodeID datanodeid, Configuration conf, 
-      Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException {
-    InetSocketAddress addr = NetUtils.makeSocketAddr(
-      datanodeid.getHost(), datanodeid.getIpcPort());
+      DatanodeInfo di, Configuration conf, 
+      Block block, Token<BlockTokenIdentifier> token, int socketTimeout,
+      boolean connectToDnViaHostname) throws IOException {
+    final String dnName = di.getNameWithIpcPort(connectToDnViaHostname);
+    LOG.debug("Connecting to " + dnName);
+    InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
     if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
       ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
     }
@@ -163,10 +205,11 @@ public class DFSClient implements FSCons
         
   /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
   static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
-      DatanodeID datanodeid, Configuration conf, int socketTimeout)
-      throws IOException {
-    InetSocketAddress addr = NetUtils.createSocketAddr(
-      datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+      DatanodeInfo di, Configuration conf, int socketTimeout,
+      boolean connectToDnViaHostname) throws IOException {
+    final String dnName = di.getNameWithIpcPort(connectToDnViaHostname);
+    LOG.debug("Connecting to " + dnName);
+    InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
     if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
       ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
     }
@@ -222,20 +265,20 @@ public class DFSClient implements FSCons
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
     this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
 
+    this.hdfsTimeout = Client.getTimeout(conf);
     ugi = UserGroupInformation.getCurrentUser();
+    this.authority = nameNodeAddr == null? "null":
+      nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
+    String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
+    this.clientName = "DFSClient_" + taskId + "_" + 
+        r.nextInt()  + "_" + Thread.currentThread().getId();
 
-    String taskId = conf.get("mapred.task.id");
-    if (taskId != null) {
-      this.clientName = "DFSClient_" + taskId; 
-    } else {
-      this.clientName = "DFSClient_" + r.nextInt();
-    }
     defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     defaultReplication = (short) conf.getInt("dfs.replication", 3);
 
     if (nameNodeAddr != null && rpcNamenode == null) {
       this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
-      this.namenode = createNamenode(this.rpcNamenode);
+      this.namenode = createNamenode(this.rpcNamenode, conf);
     } else if (nameNodeAddr == null && rpcNamenode != null) {
       //This case is used for testing.
       this.namenode = this.rpcNamenode = rpcNamenode;
@@ -251,6 +294,23 @@ public class DFSClient implements FSCons
     if (LOG.isDebugEnabled()) {
       LOG.debug("Short circuit read is " + shortCircuitLocalReads);
     }
+    this.connectToDnViaHostname = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname);
+    }
+    String localInterfaces[] =
+      conf.getStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
+    if (null == localInterfaces) {
+      localInterfaces = new String[0];
+    }
+    this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
+    if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
+      LOG.debug("Using local interfaces [" +
+          StringUtils.join(",",localInterfaces)+ "] with addresses [" +
+          StringUtils.join(",",localInterfaceAddrs) + "]");
+    }
   }
 
   static int getMaxBlockAcquireFailures(Configuration conf) {
@@ -264,20 +324,116 @@ public class DFSClient implements FSCons
       throw result;
     }
   }
+
+  /** Return the lease renewer instance. The renewer thread won't start
+   *  until the first output stream is created. The same instance will
+   *  be returned until all output streams are closed.
+   */
+  public synchronized LeaseRenewer getLeaseRenewer() throws IOException {
+      return LeaseRenewer.getInstance(authority, ugi, this);
+  }
+
+  /** Get a lease and start automatic renewal */
+  private void beginFileLease(final String src, final DFSOutputStream out) 
+      throws IOException {
+    getLeaseRenewer().put(src, out, this);
+  }
+
+  /** Stop renewal of lease for the file. */
+  void endFileLease(final String src) throws IOException {
+    getLeaseRenewer().closeFile(src, this);
+  }
     
+
+  /** Put a file. Only called from LeaseRenewer, where proper locking is
+   *  enforced to consistently update its local dfsclients array and 
+   *  client's filesBeingWritten map.
+   */
+  void putFileBeingWritten(final String src, final DFSOutputStream out) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.put(src, out);
+    }
+  }
+
+  /** Remove a file. Only called from LeaseRenewer. */
+  void removeFileBeingWritten(final String src) {
+    synchronized(filesBeingWritten) {
+      filesBeingWritten.remove(src);
+    }
+  }
+
+  /** Is file-being-written map empty? */
+  boolean isFilesBeingWrittenEmpty() {
+    synchronized(filesBeingWritten) {
+      return filesBeingWritten.isEmpty();
+    }
+  }
+
+  /**
+   * Renew leases.
+   * @return true if lease was renewed. May return false if this
+   * client has been closed or has no files open.
+   **/
+  boolean renewLease() throws IOException {
+    if (clientRunning && !isFilesBeingWrittenEmpty()) {
+      namenode.renewLease(clientName);
+      return true;
+    }
+    return false;
+  }
+
+  /** Abort and release resources held.  Ignore all errors. */
+  void abort() {
+    clientRunning = false;
+    closeAllFilesBeingWritten(true);
+    
+    try {
+      // remove reference to this client and stop the renewer,
+      // if there is no more clients under the renewer.
+      getLeaseRenewer().closeClient(this);
+    } catch (IOException ioe) {
+      LOG.info("Exception occurred while aborting the client. " + ioe);
+    }
+    RPC.stopProxy(rpcNamenode); // close connections to the namenode
+  }
+
+  /** Close/abort all files being written. */
+  private void closeAllFilesBeingWritten(final boolean abort) {
+    for(;;) {
+      final String src;
+      final DFSOutputStream out;
+      synchronized(filesBeingWritten) {
+        if (filesBeingWritten.isEmpty()) {
+          return;
+        }
+        src = filesBeingWritten.keySet().iterator().next();
+        out = filesBeingWritten.remove(src);
+      }
+      if (out != null) {
+        try {
+          if (abort) {
+            out.abort();
+          } else {
+            out.close();
+          }
+        } catch(IOException ie) {
+          LOG.error("Failed to " + (abort? "abort": "close") + " file " + src,
+              ie);
+        }
+      }
+    }
+  }
+
   /**
    * Close the file system, abandoning all of the leases and files being
    * created and close connections to the namenode.
    */
   public synchronized void close() throws IOException {
     if(clientRunning) {
-      leasechecker.close();
+      closeAllFilesBeingWritten(false);
       clientRunning = false;
-      try {
-        leasechecker.interruptAndJoin();
-      } catch (InterruptedException ie) {
-      }
-  
+
+      getLeaseRenewer().closeClient(this);
       // close connections to the namenode
       RPC.stopProxy(rpcNamenode);
     }
@@ -349,14 +505,14 @@ public class DFSClient implements FSCons
   /**
    * Get {@link BlockReader} for short circuited local reads.
    */
-  private static BlockReader getLocalBlockReader(Configuration conf,
+  private BlockReader getLocalBlockReader(Configuration conf,
       String src, Block blk, Token<BlockTokenIdentifier> accessToken,
       DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
       throws InvalidToken, IOException {
     try {
       return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
           chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
-              - offsetIntoBlock);
+              - offsetIntoBlock, connectToDnViaHostname);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(InvalidToken.class,
           AccessControlException.class);
@@ -711,10 +867,10 @@ public class DFSClient implements FSCons
     }
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
-    OutputStream result = new DFSOutputStream(src, masked,
+    final DFSOutputStream result = new DFSOutputStream(src, masked,
         overwrite, createParent, replication, blockSize, progress, buffersize,
         conf.getInt("io.bytes.per.checksum", 512));
-    leasechecker.put(src, result);
+    beginFileLease(src, result);
     return result;
   }
 
@@ -736,6 +892,20 @@ public class DFSClient implements FSCons
   }
   
   /**
+   * Close status of a file
+   * @return true if file is already closed
+   */
+  public boolean isFileClosed(String src) throws IOException{
+    checkOpen();
+    try {
+      return namenode.isFileClosed(src);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class);
+    }
+  }
+
+  /**
    * Append to an existing HDFS file.  
    * 
    * @param src file name
@@ -769,7 +939,7 @@ public class DFSClient implements FSCons
     }
     final DFSOutputStream result = new DFSOutputStream(src, buffersize, progress,
         lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
-    leasechecker.put(src, result);
+    beginFileLease(src, result);
     return result;
   }
 
@@ -792,6 +962,19 @@ public class DFSClient implements FSCons
                                      DSQuotaExceededException.class);
     }
   }
+  
+  /**
+   * Move blocks from src to trg and delete src
+   * See {@link ClientProtocol#concat(String, String [])}. 
+   */
+  public void concat(String trg, String [] srcs) throws IOException {
+    checkOpen();
+    try {
+      namenode.concat(trg, srcs);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class);
+    }
+  }
 
   /**
    * Rename file or directory.
@@ -880,6 +1063,60 @@ public class DFSClient implements FSCons
   }
 
   /**
+   * Return the socket addresses to use with each configured
+   * local interface. Local interfaces may be specified by IP
+   * address, IP address range using CIDR notation, interface
+   * name (e.g. eth0) or sub-interface name (e.g. eth0:0).
+   * The socket addresses consist of the IPs for the interfaces
+   * and the ephemeral port (port 0). If an IP, IP range, or
+   * interface name matches an interface with sub-interfaces
+   * only the IP of the interface is used. Sub-interfaces can
+   * be used by specifying them explicitly (by IP or name).
+   * 
+   * @return SocketAddresses for the configured local interfaces,
+   *    or an empty array if none are configured
+   * @throws UnknownHostException if a given interface name is invalid
+   */
+  private static SocketAddress[] getLocalInterfaceAddrs(
+        String interfaceNames[]) throws UnknownHostException {
+    List<SocketAddress> localAddrs = new ArrayList<SocketAddress>();
+    for (String interfaceName : interfaceNames) {
+      if (IPAddressUtil.isIPv4LiteralAddress(interfaceName)) {
+        localAddrs.add(new InetSocketAddress(interfaceName, 0));
+      } else if (NetUtils.isValidSubnet(interfaceName)) {
+        for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
+          localAddrs.add(new InetSocketAddress(addr, 0));
+        }
+      } else {
+        for (String ip : DNS.getIPs(interfaceName, false)) {
+          localAddrs.add(new InetSocketAddress(ip, 0));
+        }
+      }
+    }
+    return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
+  }
+
+  /**
+   * Select one of the configured local interfaces at random. We use a random
+   * interface because other policies like round-robin are less effective
+   * given that we cache connections to datanodes.
+   *
+   * @return one of the local interface addresses at random, or null if no
+   *    local interfaces are configured
+   */
+  private SocketAddress getRandomLocalInterfaceAddr() {
+    if (localInterfaceAddrs.length == 0) {
+      return null;
+    }
+    final int idx = r.nextInt(localInterfaceAddrs.length);
+    final SocketAddress addr = localInterfaceAddrs[idx];
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using local interface " + addr);
+    }
+    return addr;
+  }
+
+  /**
    * Get the checksum of a file.
    * @param src The file path
    * @return The checksum 
@@ -887,7 +1124,7 @@ public class DFSClient implements FSCons
    */
   public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
     checkOpen();
-    return getFileChecksum(src, namenode, socketFactory, socketTimeout);    
+    return getFileChecksum(src, namenode, socketFactory, socketTimeout, connectToDnViaHostname);    
   }
 
   /**
@@ -898,6 +1135,12 @@ public class DFSClient implements FSCons
   public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
       ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
       ) throws IOException {
+    return getFileChecksum(src, namenode, socketFactory, socketTimeout, false);
+  }
+
+  private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+      boolean connectToDnViaHostname) throws IOException {
     //get all block locations
     LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
     if (null == blockLocations) {
@@ -930,6 +1173,7 @@ public class DFSClient implements FSCons
      
       boolean done = false;
       for(int j = 0; !done && j < datanodes.length; j++) {
+        final String dnName = datanodes[j].getName(connectToDnViaHostname);
         Socket sock = null;
         DataOutputStream out = null;
         DataInputStream in = null;
@@ -937,17 +1181,17 @@ public class DFSClient implements FSCons
         try {
           //connect to a datanode
           sock = socketFactory.createSocket();
-          NetUtils.connect(sock,
-              NetUtils.createSocketAddr(datanodes[j].getName()), timeout);
+          LOG.debug("Connecting to " + dnName);
+          NetUtils.connect(sock, NetUtils.createSocketAddr(dnName), timeout);
           sock.setSoTimeout(timeout);
 
           out = new DataOutputStream(
-              new BufferedOutputStream(NetUtils.getOutputStream(sock), 
+              new BufferedOutputStream(NetUtils.getOutputStream(sock),
                                        DataNode.SMALL_BUFFER_SIZE));
           in = new DataInputStream(NetUtils.getInputStream(sock));
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("write to " + datanodes[j].getName() + ": "
+            LOG.debug("write to " + dnName + ": "
                 + DataTransferProtocol.OP_BLOCK_CHECKSUM + ", block=" + block);
           }
 
@@ -966,7 +1210,7 @@ public class DFSClient implements FSCons
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
                     + "for file " + src + " for block " + block
-                    + " from datanode " + datanodes[j].getName()
+                    + " from datanode " + dnName
                     + ". Will retry the block once.");
               }
               lastRetriedIndex = i;
@@ -976,7 +1220,7 @@ public class DFSClient implements FSCons
               break;
             } else {
               throw new IOException("Bad response " + reply + " for block "
-                  + block + " from datanode " + datanodes[j].getName());
+                  + block + " from datanode " + dnName);
             }
           }
 
@@ -1007,12 +1251,10 @@ public class DFSClient implements FSCons
               LOG.debug("set bytesPerCRC=" + bytesPerCRC
                   + ", crcPerBlock=" + crcPerBlock);
             }
-            LOG.debug("got reply from " + datanodes[j].getName()
-                + ": md5=" + md5);
+            LOG.debug("got reply from " + dnName + ": md5=" + md5);
           }
         } catch (IOException ie) {
-          LOG.warn("src=" + src + ", datanodes[" + j + "].getName()="
-              + datanodes[j].getName(), ie);
+          LOG.warn("src=" + src + ", datanodes[" + j + "]=" + dnName, ie);
         } finally {
           IOUtils.closeStream(in);
           IOUtils.closeStream(out);
@@ -1287,118 +1529,7 @@ public class DFSClient implements FSCons
     throw new IOException("No live nodes contain current block");
   }
 
-  boolean isLeaseCheckerStarted() {
-    return leasechecker.daemon != null;
-  }
-
-  /** Lease management*/
-  class LeaseChecker implements Runnable {
-    /** A map from src -> DFSOutputStream of files that are currently being
-     * written by this client.
-     */
-    private final SortedMap<String, OutputStream> pendingCreates
-        = new TreeMap<String, OutputStream>();
-
-    private Daemon daemon = null;
-    
-    synchronized void put(String src, OutputStream out) {
-      if (clientRunning) {
-        if (daemon == null) {
-          daemon = new Daemon(this);
-          daemon.start();
-        }
-        pendingCreates.put(src, out);
-      }
-    }
-    
-    synchronized void remove(String src) {
-      pendingCreates.remove(src);
-    }
-    
-    void interruptAndJoin() throws InterruptedException {
-      Daemon daemonCopy = null;
-      synchronized (this) {
-        if (daemon != null) {
-          daemon.interrupt();
-          daemonCopy = daemon;
-        }
-      }
-     
-      if (daemonCopy != null) {
-        LOG.debug("Wait for lease checker to terminate");
-        daemonCopy.join();
-      }
-    }
-
-    void close() {
-      while (true) {
-        String src;
-        OutputStream out;
-        synchronized (this) {
-          if (pendingCreates.isEmpty()) {
-            return;
-          }
-          src = pendingCreates.firstKey();
-          out = pendingCreates.remove(src);
-        }
-        if (out != null) {
-          try {
-            out.close();
-          } catch (IOException ie) {
-            LOG.error("Exception closing file " + src+ " : " + ie, ie);
-          }
-        }
-      }
-    }
-
-    private void renew() throws IOException {
-      synchronized(this) {
-        if (pendingCreates.isEmpty()) {
-          return;
-        }
-      }
-      namenode.renewLease(clientName);
-    }
-
-    /**
-     * Periodically check in with the namenode and renew all the leases
-     * when the lease period is half over.
-     */
-    public void run() {
-      long lastRenewed = 0;
-      while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
-          try {
-            renew();
-            lastRenewed = System.currentTimeMillis();
-          } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName, ie);
-          }
-        }
-
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + " is interrupted.", ie);
-          }
-          return;
-        }
-      }
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-      String s = getClass().getSimpleName();
-      if (LOG.isTraceEnabled()) {
-        return s + "@" + DFSClient.this + ": "
-               + StringUtils.stringifyException(new Throwable("for testing"));
-      }
-      return s;
-    }
-  }
-
-  /** Utility class to encapsulate data node info and its ip address. */
+  /** Utility class to encapsulate data node info and its address. */
   private static class DNAddrPair {
     DatanodeInfo info;
     InetSocketAddress addr;
@@ -1840,79 +1971,136 @@ public class DFSClient implements FSCons
      * Grab the open-file info from namenode
      */
     synchronized void openInfo() throws IOException {
-      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
+      for (int retries = 3; retries > 0; retries--) {
+        if (fetchLocatedBlocks()) {
+          // fetch block success
+          return;
+        } else {
+          // Last block location unavailable. When a cluster restarts,
+          // DNs may not report immediately. At this time partial block
+          // locations will not be available with NN for getting the length.
+          // Lets retry a few times to get the length.
+          DFSClient.LOG.warn("Last block locations unavailable. "
+              + "Datanodes might not have reported blocks completely."
+              + " Will retry for " + retries + " times");
+          waitFor(4000);
+        }
+      }
+      throw new IOException("Could not obtain the last block locations.");
+    }
+    
+    private void waitFor(int waitTime) throws InterruptedIOException {
+      try {
+        Thread.sleep(waitTime);
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException(
+            "Interrupted while getting the last block length.");
+      }
+    }
+
+    private boolean fetchLocatedBlocks() throws IOException,
+        FileNotFoundException {
+      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0,
+          prefetchSize);
       if (newInfo == null) {
         throw new FileNotFoundException("File does not exist: " + src);
       }
 
-      // I think this check is not correct. A file could have been appended to
-      // between two calls to openInfo().
-      if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
-          !newInfo.isUnderConstruction()) {
-        Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
+      if (locatedBlocks != null && !locatedBlocks.isUnderConstruction()
+          && !newInfo.isUnderConstruction()) {
+        Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks()
+            .iterator();
         Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
         while (oldIter.hasNext() && newIter.hasNext()) {
-          if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
+          if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
             throw new IOException("Blocklist for " + src + " has changed!");
           }
         }
       }
-      updateBlockInfo(newInfo);
+      boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
       this.locatedBlocks = newInfo;
       this.currentNode = null;
+      return isBlkInfoUpdated;
     }
     
     /** 
      * For files under construction, update the last block size based
      * on the length of the block from the datanode.
      */
-    private void updateBlockInfo(LocatedBlocks newInfo) {
+    private boolean updateBlockInfo(LocatedBlocks newInfo) throws IOException {
       if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction()
           || !(newInfo.locatedBlockCount() > 0)) {
-        return;
+        return true;
       }
 
       LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1);
       boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo
           .getFileLength());
-      if (!lastBlockInFile || last.getLocations().length <= 0) {
-        return;
+      if (!lastBlockInFile) {
+        return true;
+      }
+      
+      if (last.getLocations().length == 0) {
+        return false;
       }
+      
       ClientDatanodeProtocol primary = null;
-      DatanodeInfo primaryNode = last.getLocations()[0];
-      try {
-        primary = createClientDatanodeProtocolProxy(primaryNode, conf,
-            last.getBlock(), last.getBlockToken(), socketTimeout);
-        Block newBlock = primary.getBlockInfo(last.getBlock());
-        long newBlockSize = newBlock.getNumBytes();
-        long delta = newBlockSize - last.getBlockSize();
-        // if the size of the block on the datanode is different
-        // from what the NN knows about, the datanode wins!
-        last.getBlock().setNumBytes(newBlockSize);
-        long newlength = newInfo.getFileLength() + delta;
-        newInfo.setFileLength(newlength);
-        LOG.debug("DFSClient setting last block " + last + " to length "
-            + newBlockSize + " filesize is now " + newInfo.getFileLength());
-      } catch (IOException e) {
-        if (e.getMessage().startsWith(
-            "java.io.IOException: java.lang.NoSuchMethodException: "
-                + "org.apache.hadoop.hdfs.protocol"
-                + ".ClientDatanodeProtocol.getBlockInfo")) {
-          // We're talking to a server that doesn't implement HDFS-200.
-          serverSupportsHdfs200 = false;
-        } else {
-          LOG.debug("DFSClient file " + src
-              + " is being concurrently append to" + " but datanode "
-              + primaryNode.getHostName() + " probably does not have block "
-              + last.getBlock());
+      Block newBlock = null;
+      for (int i = 0; i < last.getLocations().length && newBlock == null; i++) {
+        DatanodeInfo datanode = last.getLocations()[i];
+        try {
+          primary = createClientDatanodeProtocolProxy(datanode, conf, last
+              .getBlock(), last.getBlockToken(), socketTimeout,
+              connectToDnViaHostname);
+          newBlock = primary.getBlockInfo(last.getBlock());
+        } catch (IOException e) {
+          if (e.getMessage().startsWith(
+              "java.io.IOException: java.lang.NoSuchMethodException: "
+                  + "org.apache.hadoop.hdfs.protocol"
+                  + ".ClientDatanodeProtocol.getBlockInfo")) {
+            // We're talking to a server that doesn't implement HDFS-200.
+            serverSupportsHdfs200 = false;
+          } else {
+            LOG.info("Failed to get block info from "
+                + datanode.getHostName() + " probably does not have "
+                + last.getBlock(), e);
+          }
+        } finally {
+          if (primary != null) {
+            RPC.stopProxy(primary);
+          }
         }
       }
+      
+      if (newBlock == null) {
+        if (!serverSupportsHdfs200) {
+          return true;
+        }
+        throw new IOException(
+            "Failed to get block info from any of the DN in pipeline: "
+                    + Arrays.toString(last.getLocations()));
+      }
+      
+      long newBlockSize = newBlock.getNumBytes();
+      long delta = newBlockSize - last.getBlockSize();
+      // if the size of the block on the datanode is different
+      // from what the NN knows about, the datanode wins!
+      last.getBlock().setNumBytes(newBlockSize);
+      long newlength = newInfo.getFileLength() + delta;
+      newInfo.setFileLength(newlength);
+      LOG.debug("DFSClient setting last block " + last + " to length "
+          + newBlockSize + " filesize is now " + newInfo.getFileLength());
+      return true;
     }
     
     public synchronized long getFileLength() {
       return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
     }
 
+    private synchronized boolean blockUnderConstruction() {
+      return locatedBlocks.isUnderConstruction();
+    }
+
     /**
      * Returns the datanode from which the stream is currently reading.
      */
@@ -2024,10 +2212,9 @@ public class DFSClient implements FSCons
 
     private boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
         throws IOException {
-      if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
-        return true;
-      }
-      return false;
+      // Can't local read a block under construction, see HDFS-2757
+      return shortCircuitLocalReads && !blockUnderConstruction()
+        && isLocalAddress(targetAddr);
     }
     
     /**
@@ -2086,7 +2273,7 @@ public class DFSClient implements FSCons
               fetchBlockAt(target);
               continue;
             } else {
-              LOG.info("Failed to read block " + targetBlock.getBlock()
+              LOG.info("Failed to read " + targetBlock.getBlock()
                   + " on local machine" + StringUtils.stringifyException(ex));
               LOG.info("Try reading via the datanode on " + targetAddr);
             }
@@ -2095,7 +2282,9 @@ public class DFSClient implements FSCons
 
         try {
           s = socketFactory.createSocket();
-          NetUtils.connect(s, targetAddr, socketTimeout);
+          LOG.debug("Connecting to " + targetAddr);
+          NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), 
+              socketTimeout);
           s.setSoTimeout(socketTimeout);
           blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(), 
               accessToken, 
@@ -2225,7 +2414,7 @@ public class DFSClient implements FSCons
             if (pos > blockEnd) {
               currentNode = blockSeekTo(pos);
             }
-            int realLen = Math.min(len, (int) (blockEnd - pos + 1));
+            int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
             int result = readBuffer(buf, off, realLen);
             
             if (result >= 0) {
@@ -2262,8 +2451,8 @@ public class DFSClient implements FSCons
         DatanodeInfo[] nodes = block.getLocations();
         try {
           DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
-          InetSocketAddress targetAddr = 
-                            NetUtils.createSocketAddr(chosenNode.getName());
+          InetSocketAddress targetAddr =
+            NetUtils.createSocketAddr(chosenNode.getName(connectToDnViaHostname));
           return new DNAddrPair(chosenNode, targetAddr);
         } catch (IOException ie) {
           String blockInfo = block.getBlock() + " file=" + src;
@@ -2272,9 +2461,9 @@ public class DFSClient implements FSCons
           }
           
           if (nodes == null || nodes.length == 0) {
-            LOG.info("No node available for block: " + blockInfo);
+            LOG.info("No node available for: " + blockInfo);
           }
-          LOG.info("Could not obtain block " + block.getBlock()
+          LOG.info("Could not obtain " + block.getBlock()
               + " from any node: " + ie
               + ". Will get new block locations from namenode and retry...");
           try {
@@ -2326,7 +2515,9 @@ public class DFSClient implements FSCons
           } else {
             // go to the datanode
             dn = socketFactory.createSocket();
-            NetUtils.connect(dn, targetAddr, socketTimeout);
+            LOG.debug("Connecting to " + targetAddr);
+            NetUtils.connect(dn, targetAddr, getRandomLocalInterfaceAddr(),
+                socketTimeout);
             dn.setSoTimeout(socketTimeout);
             reader = RemoteBlockReader.newBlockReader(dn, src, 
                 block.getBlock().getBlockId(), accessToken,
@@ -3046,12 +3237,12 @@ public class DFSClient implements FSCons
         return false;
       }
       if (response != null) {
-        LOG.info("Error Recovery for block " + block +
+        LOG.info("Error Recovery for " + block +
                  " waiting for responder to exit. ");
         return true;
       }
       if (errorIndex >= 0) {
-        LOG.warn("Error Recovery for block " + block
+        LOG.warn("Error Recovery for " + block
             + " bad datanode[" + errorIndex + "] "
             + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
       }
@@ -3124,7 +3315,7 @@ public class DFSClient implements FSCons
           // to each DN and two rpcs to the NN.
           int recoveryTimeout = (newnodes.length * 2 + 2) * socketTimeout;
           primary = createClientDatanodeProtocolProxy(primaryNode, conf, block,
-              accessToken, recoveryTimeout);
+                      accessToken, recoveryTimeout, connectToDnViaHostname);
           newBlock = primary.recoverBlock(block, isAppend, newnodes);
         } catch (IOException e) {
           LOG.warn("Failed recovery attempt #" + recoveryErrorCount +
@@ -3252,8 +3443,17 @@ public class DFSClient implements FSCons
       computePacketChunkSize(writePacketSize, bytesPerChecksum);
 
       try {
-        namenode.create(
-            src, masked, clientName, overwrite, createParent, replication, blockSize);
+        // Make sure the regular create() is done through the old create().
+        // This is done to ensure that newer clients (post-1.0) can talk to
+        // older clusters (pre-1.0). Older clusters lack the new  create()
+        // method accepting createParent as one of the arguments.
+        if (createParent) {
+          namenode.create(
+            src, masked, clientName, overwrite, replication, blockSize);
+        } else {
+          namenode.create(
+            src, masked, clientName, overwrite, false, replication, blockSize);
+        }
       } catch(RemoteException re) {
         throw re.unwrapRemoteException(AccessControlException.class,
                                        FileAlreadyExistsException.class,
@@ -3388,7 +3588,7 @@ public class DFSClient implements FSCons
         success = createBlockOutputStream(nodes, clientName, false);
 
         if (!success) {
-          LOG.info("Abandoning block " + block);
+          LOG.info("Abandoning " + block);
           namenode.abandonBlock(block, src, clientName);
 
           if (errorIndex < nodes.length) {
@@ -3425,16 +3625,19 @@ public class DFSClient implements FSCons
 
       boolean result = false;
       try {
-        LOG.debug("Connecting to " + nodes[0].getName());
-        InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
+        final String dnName = nodes[0].getName(connectToDnViaHostname);
+        InetSocketAddress target = NetUtils.createSocketAddr(dnName);
         s = socketFactory.createSocket();
-        timeoutValue = 3000 * nodes.length + socketTimeout;
-        NetUtils.connect(s, target, timeoutValue);
+        timeoutValue = (socketTimeout > 0) ?
+            (3000 * nodes.length + socketTimeout) : 0;
+        LOG.debug("Connecting to " + dnName);
+        NetUtils.connect(s, target, getRandomLocalInterfaceAddr(), timeoutValue);
         s.setSoTimeout(timeoutValue);
         s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
         LOG.debug("Send buf size " + s.getSendBufferSize());
-        long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
-                            datanodeWriteTimeout;
+        long writeTimeout = (datanodeWriteTimeout > 0) ?
+            (HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
+            datanodeWriteTimeout) : 0;
 
         //
         // Xmit header info to datanode
@@ -3817,12 +4020,12 @@ public class DFSClient implements FSCons
           throw e;
       }
       closeInternal();
-      leasechecker.remove(src);
       
       if (s != null) {
         s.close();
         s = null;
       }
+      endFileLease(src);
     }
     
     /**
@@ -3835,6 +4038,20 @@ public class DFSClient implements FSCons
       closed = true;
     }
  
+    /**
+     * Aborts this output stream and releases any system 
+     * resources associated with this stream.
+     */
+    synchronized void abort() throws IOException {
+      if (closed) {
+        return;
+      }
+      setLastException(new IOException("Lease timeout of "
+          + (hdfsTimeout / 1000) + " seconds expired."));
+      closeThreads();
+      endFileLease(src);
+    }
+ 
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {
       try {
@@ -3903,10 +4120,20 @@ public class DFSClient implements FSCons
         while (!fileComplete) {
           fileComplete = namenode.complete(src, clientName);
           if (!fileComplete) {
+            if (!clientRunning ||
+                  (hdfsTimeout > 0 &&
+                   localstart + hdfsTimeout < System.currentTimeMillis())) {
+                String msg = "Unable to close file because dfsclient " +
+                              " was unable to contact the HDFS servers." +
+                              " clientRunning " + clientRunning +
+                              " hdfsTimeout " + hdfsTimeout;
+                LOG.info(msg);
+                throw new IOException(msg);
+            }
             try {
               Thread.sleep(400);
               if (System.currentTimeMillis() - localstart > 5000) {
-                LOG.info("Could not complete file " + src + " retrying...");
+                LOG.info("Could not complete " + src + " retrying...");
               }
             } catch (InterruptedException ie) {
             }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jun 21 06:37:27 2013
@@ -36,9 +36,15 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
   public static final String  DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+  public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.client.retry.policy.enabled";
+  public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false; 
+  public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.client.retry.policy.spec";
+  public static final String  DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,... 
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
-  
+  public static final String  DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
+  public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
+
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
   public static final String  DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
@@ -46,6 +52,15 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
   public static final String  DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
   public static final long    DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
+  public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
+  public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
+  public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
+  public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
+  public static final String  DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
+  public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
+  public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
+  public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
+  
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
   public static final String  DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
@@ -58,6 +73,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_SAFEMODE_EXTENSION_DEFAULT = 30000;
   public static final String  DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY = "dfs.namenode.safemode.threshold-pct";
   public static final float   DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;
+  public static final String  DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY = "dfs.namenode.safemode.min.datanodes";
+  public static final int     DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
   public static final String  DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address";
   public static final String  DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090";
   public static final String  DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period";
@@ -90,6 +107,8 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
   public static final String  DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";
   public static final boolean DFS_PERMISSIONS_ENABLED_DEFAULT = true;
+  public static final String  DFS_PERSIST_BLOCKS_KEY = "dfs.persist.blocks";
+  public static final boolean DFS_PERSIST_BLOCKS_DEFAULT = false;
   public static final String  DFS_PERMISSIONS_SUPERUSERGROUP_KEY = "dfs.permissions.superusergroup";
   public static final String  DFS_ADMIN = "dfs.cluster.administrators";
   public static final String  DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = "supergroup";
@@ -101,6 +120,10 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
   public static final String  DFS_LIST_LIMIT = "dfs.ls.limit";
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
+  public static final String  DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
+  public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
+  public static final String  DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
+  public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
 
   //Delegation token related keys
   public static final String  DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";
@@ -109,6 +132,8 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 24*60*60*1000;
   public static final String  DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY = "dfs.namenode.delegation.token.max-lifetime";
   public static final long    DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000;
+  public static final String  DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY = "dfs.namenode.delegation.token.always-use"; // for tests
+  public static final boolean DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT = false;
 
   //Following keys have no defaults
   public static final String  DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";
@@ -119,7 +144,6 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
   public static final String  DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
   public static final String  DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; 
-  public static final String  DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
   public static final String  DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
   public static final String  DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
   public static final String  DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId";
@@ -128,6 +152,25 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
   public static final String  DFS_NAMENODE_CHECKPOINT_DIR_KEY = "dfs.namenode.checkpoint.dir";
   public static final String  DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
+  public static final String  DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
+  
+  // Whether to enable datanode's stale state detection and usage for reads
+  public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
+  public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
+  // Whether to enable datanode's stale state detection and usage for writes
+  public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
+  public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
+  // The default value of the time interval for marking datanodes as stale
+  public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
+  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
+  // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. 
+  // This value uses the times of heartbeat interval to define the minimum value for stale interval.  
+  public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
+  public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
+  // When the percentage of stale datanodes reaches this ratio,
+  // allow writing to stale nodes to prevent hotspots.
+  public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
+  public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
 
   //Code in hdfs is not updated to use these keys.
   public static final String  DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
@@ -174,8 +217,6 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
   public static final String  DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
   public static final int     DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
-  public static final String  DFS_SUPPORT_APPEND_KEY = "dfs.support.append";
-  public static final boolean DFS_SUPPORT_APPEND_DEFAULT = false;
   public static final String  DFS_HTTPS_ENABLE_KEY = "dfs.https.enable";
   public static final boolean DFS_HTTPS_ENABLE_DEFAULT = false;
   public static final String  DFS_DEFAULT_CHUNK_VIEW_SIZE_KEY = "dfs.default.chunk.view.size";
@@ -185,6 +226,14 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_DATANODE_IPC_ADDRESS_KEY = "dfs.datanode.ipc.address";
   public static final String  DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:50020";
 
+  //Replication monitoring related keys
+  public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
+      								"dfs.namenode.invalidate.work.pct.per.iteration";
+  public static final float DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT = 0.32f;
+  public static final String DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION =
+     								"dfs.namenode.replication.work.multiplier.per.iteration";
+  public static final int DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT = 2;
+
   public static final String  DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
   public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;
   public static final String  DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY = "dfs.block.access.key.update.interval";
@@ -202,11 +251,18 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
   public static final String  DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
   public static final int     DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 100;
+  public static final String  DFS_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";
+  public static final int     DFS_MAX_CORRUPT_FILES_RETURNED_DEFAULT = 500;
+  
   public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
   public static final String  DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
   public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
 
+  public static final String DFS_IMAGE_TRANSFER_RATE_KEY =
+                                           "dfs.image.transfer.bandwidthPerSec";
+  public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling
+
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
@@ -219,14 +275,16 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file";
   public static final String  DFS_NAMENODE_USER_NAME_KEY = "dfs.namenode.kerberos.principal";
   public static final String  DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.namenode.kerberos.https.principal";
+  public static final String  DFS_NAMENODE_INTERNAL_SPENGO_USER_NAME_KEY = "dfs.namenode.kerberos.internal.spnego.principal";
   
   public static final String  DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY = "dfs.secondary.namenode.keytab.file";
   public static final String  DFS_SECONDARY_NAMENODE_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.principal";
   public static final String  DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
+  public static final String  DFS_SECONDARY_NAMENODE_INTERNAL_SPENGO_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.internal.spnego.principal";
   public static final String  DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
   public static final int     DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
   
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
   public static final String  DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
-  public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
+  public static final String  DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
 }

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java Fri Jun 21 06:37:27 2013
@@ -22,8 +22,10 @@ import java.io.UnsupportedEncodingExcept
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Comparator;
 import java.util.StringTokenizer;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -123,5 +125,75 @@ public class DFSUtil {
       throw new IllegalArgumentException(ue);
     }
   }
+  
+  /**
+   * Get DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION from configuration.
+   * 
+   * @param conf Configuration
+   * @return Value of DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION
+   * @throws IllegalArgumentException If the value is not positive
+   */
+  public static float getInvalidateWorkPctPerIteration(Configuration conf) {
+    float blocksInvalidateWorkPct = conf.getFloat(
+        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION,
+        DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT);
+    if (blocksInvalidateWorkPct <= 0.0f || blocksInvalidateWorkPct > 1.0f) {
+      throw new IllegalArgumentException(
+          DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION + " = '"
+              + blocksInvalidateWorkPct + "' is invalid. "
+              + "It should be a positive, non-zero float value "
+              + "indicating a percentage.");
+    }
+    return blocksInvalidateWorkPct; 
+  }
+  
+  /**
+   * Get DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION 
+   * from configuration.
+   * 
+   * @param conf Configuration
+   * @return Value of DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
+   * @throws IllegalArgumentException If the value is not positive
+   */
+  public static int getReplWorkMultiplier(Configuration conf) {
+    int blocksReplWorkMultiplier = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+        DFSConfigKeys.
+          DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
+    if (blocksReplWorkMultiplier <= 0) {
+      throw new IllegalArgumentException(
+          DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
+              + " = '" + blocksReplWorkMultiplier + "' is invalid. "
+              + "It should be a positive, non-zero integer value.");
+    }
+    return blocksReplWorkMultiplier;
+  }
+   
+  /**
+   * Comparator for sorting DataNodeInfo[] based on stale states. Stale nodes
+   * are moved to the end of the array on sorting with this comparator.
+   */
+  public static class StaleComparator implements Comparator<DatanodeInfo> {
+    private long staleInterval;
+
+    /**
+     * Constructor of StaleComparator
+     * 
+     * @param interval
+     *          The time interval for marking datanodes as stale is passed from
+     *          outside, since the interval may be changed dynamically
+     */
+    public StaleComparator(long interval) {
+      this.staleInterval = interval;
+    }
+
+    @Override
+    public int compare(DatanodeInfo a, DatanodeInfo b) {
+      // Stale nodes will be moved behind the normal nodes
+      boolean aStale = a.isStale(staleInterval);
+      boolean bStale = b.isStale(staleInterval);
+      return aStale == bStale ? 0 : (aStale ? 1 : -1);
+    }
+  }   
 }