You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/10 02:11:33 UTC

svn commit: r1134137 [1/2] - in /hadoop/hdfs/branches/HDFS-1073: ./ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org...

Author: todd
Date: Fri Jun 10 00:11:32 2011
New Revision: 1134137

URL: http://svn.apache.org/viewvc?rev=1134137&view=rev
Log:
Merge trunk into HDFS-1073. Needs a few edits post-merge due to inclusion of HDFS-2003.

Added:
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/SocketCache.java
      - copied unchanged from r1134136, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/SocketCache.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
      - copied unchanged from r1134136, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
      - copied unchanged from r1134136, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSAddressConfig.java
      - copied unchanged from r1134136, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSAddressConfig.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java
      - copied unchanged from r1134136, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java
Modified:
    hadoop/hdfs/branches/HDFS-1073/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1073/build.xml
    hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/configure.ac
    hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
    hadoop/hdfs/branches/HDFS-1073/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/HDFS-1073/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 10 00:11:32 2011
@@ -2,4 +2,4 @@
 /hadoop/hdfs/branches/HDFS-1052:987665-1095512
 /hadoop/hdfs/branches/HDFS-265:796829-820463
 /hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:1086482-1132839
+/hadoop/hdfs/trunk:1086482-1134136

Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.txt?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.txt Fri Jun 10 00:11:32 2011
@@ -287,6 +287,9 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
+    (Eric Payne via mattf)
+
     HDFS-2019. Fix all the places where Java method File.list is used with
     FileUtil.list API (Bharath Mundlapudi via mattf)
 
@@ -390,6 +393,7 @@ Trunk (unreleased changes)
 
     HDFS-1295. Improve namenode restart times by short-circuiting the
     first block reports from datanodes. (Matt Foley via suresh)
+    Corrected merge error in DataNode.java. (Matt Foley)
 
     HDFS-1843. Discover file not found early for file append. 
     (Bharath Mundlapudi via jitendra)
@@ -485,6 +489,14 @@ Trunk (unreleased changes)
     HDFS-2029. In TestWriteRead, check visible length immediately after
     openning the file and fix code style.  (John George via szetszwo)
 
+    HDFS-2040. Only build libhdfs if a flag is passed. (eli)
+
+    HDFS-1586. Add InterfaceAudience and InterfaceStability annotations to 
+    MiniDFSCluster. (suresh)
+
+    HDFS-2003. Separate FSEditLog reading logic from edit log memory state
+    building logic. (Ivan Kelly via todd)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -495,6 +507,9 @@ Trunk (unreleased changes)
     HDFS-1826. NameNode should save image to name directories in parallel
     during upgrade. (Matt Foley via hairong)
 
+    HDFS-941. The DFS client should cache and reuse open sockets to datanodes
+    while performing reads. (bc Wong and Todd Lipcon via todd)
+
   BUG FIXES
 
     HDFS-1449. Fix test failures - ExtendedBlock must return 
@@ -942,6 +957,11 @@ Release 0.22.0 - Unreleased
     HDFS-1980. Move build/webapps deeper in the build directory heirarchy
     to aid eclipse users. (todd)
 
+    HDFS-1619. Remove AC_TYPE* from the libhdfs. (Roman Shaposhnik via eli)
+
+    HDFS-1948  Forward port 'hdfs-1520 lightweight namenode operation to
+    trigger lease recovery' (stack)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)

Modified: hadoop/hdfs/branches/HDFS-1073/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/build.xml?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/build.xml Fri Jun 10 00:11:32 2011
@@ -326,10 +326,6 @@
 
   </target>
 
-  <target name="set-c++-libhdfs">
-    <property name="libhdfs" value="true"/>
-  </target>
-
   <import file="${test.src.dir}/aop/build/aop.xml"/>
 
   <target name="compile-hdfs-classes" depends="init">
@@ -1102,7 +1098,7 @@
     </macro_tar>
   </target>
 
-  <target name="bin-package" depends="set-c++-libhdfs, compile, compile-c++-libhdfs, jar, jar-test, ant-tasks, jsvc" 
+  <target name="bin-package" depends="compile, compile-c++-libhdfs, jar, jar-test, ant-tasks, jsvc" 
 		description="assembles artifacts for binary target">
     <mkdir dir="${dist.dir}"/>
     <mkdir dir="${dist.dir}/lib"/>
@@ -1118,7 +1114,7 @@
     </copy>
 
     <copy todir="${dist.dir}/lib" includeEmptyDirs="false">
-      <fileset dir="${build.dir}/c++/${build.platform}/lib">
+      <fileset dir="${build.dir}/c++/${build.platform}/lib" erroronmissingdir="false">
         <include name="**"/>
       </fileset>
     </copy>

Propchange: hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 10 00:11:32 2011
@@ -1,4 +1,4 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
 /hadoop/core/trunk/src/c++/libhdfs:776175-784663
 /hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs:987665-1095512
-/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1132839
+/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1134136

Modified: hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/configure.ac
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/c%2B%2B/libhdfs/configure.ac?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/configure.ac (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/configure.ac Fri Jun 10 00:11:32 2011
@@ -18,7 +18,6 @@
 # Autoconf input file
 # $Id$
 
-AC_PREREQ(2.61)
 AC_INIT([libhdfs], [0.1.0], omalley@apache.org)
 AC_PREFIX_DEFAULT([`pwd`/../install])
 AC_CONFIG_AUX_DIR([config])
@@ -122,9 +121,5 @@ AC_C_CONST
 AC_C_VOLATILE
 #AC_FUNC_MALLOC
 AC_HEADER_STDBOOL
-AC_TYPE_INT16_T
-AC_TYPE_INT32_T
-AC_TYPE_INT64_T
-AC_TYPE_UINT16_T
 AC_SUBST(PRODUCT_MK)
 AC_OUTPUT

Propchange: hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 10 00:11:32 2011
@@ -3,4 +3,4 @@
 /hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy:987665-1095512
 /hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1132839
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1134136

Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 10 00:11:32 2011
@@ -3,4 +3,4 @@
 /hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512
 /hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:1086482-1132839
+/hadoop/hdfs/trunk/src/java:1086482-1134136

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java Fri Jun 10 00:11:32 2011
@@ -46,12 +46,29 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.DataChecksum;
 
 /** This is a wrapper around connection to datanode
- * and understands checksum, offset etc
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ *   <dd>The hdfs block, typically large (~64MB).
+ *   </dd>
+ * <dt>chunk</dt>
+ *   <dd>A block is divided into chunks, each comes with a checksum.
+ *       We want transfers to be chunk-aligned, to be able to
+ *       verify checksums.
+ *   </dd>
+ * <dt>packet</dt>
+ *   <dd>A grouping of chunks used for transport. It contains a
+ *       header, followed by checksum data, followed by real data.
+ *   </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
  */
 @InterfaceAudience.Private
 public class BlockReader extends FSInputChecker {
 
-  Socket dnSock; //for now just sending checksumOk.
+  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
   private DataInputStream in;
   private DataChecksum checksum;
 
@@ -77,10 +94,12 @@ public class BlockReader extends FSInput
    */
   private final long bytesNeededToFinish;
 
-  private boolean gotEOS = false;
+  private boolean eos = false;
+  private boolean sentStatusCode = false;
   
   byte[] skipBuf = null;
   ByteBuffer checksumBytes = null;
+  /** Amount of unread data in the current received packet */
   int dataLeft = 0;
   
   /* FSInputChecker interface */
@@ -99,7 +118,7 @@ public class BlockReader extends FSInput
     // This has to be set here, *before* the skip, since we can
     // hit EOS during the skip, in the case that our entire read
     // is smaller than the checksum chunk.
-    boolean eosBefore = gotEOS;
+    boolean eosBefore = eos;
 
     //for the first read, skip the extra bytes at the front.
     if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
@@ -115,11 +134,14 @@ public class BlockReader extends FSInput
     }
     
     int nRead = super.read(buf, off, len);
-    
-    // if gotEOS was set in the previous read and checksum is enabled :
-    if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
-      //checksum is verified and there are no errors.
-      checksumOk(dnSock);
+
+    // if eos was set in the previous read, send a status code to the DN
+    if (eos && !eosBefore && nRead >= 0) {
+      if (needChecksum()) {
+        sendReadResult(dnSock, CHECKSUM_OK);
+      } else {
+        sendReadResult(dnSock, SUCCESS);
+      }
     }
     return nRead;
   }
@@ -191,7 +213,7 @@ public class BlockReader extends FSInput
                                        int len, byte[] checksumBuf) 
                                        throws IOException {
     // Read one chunk.
-    if ( gotEOS ) {
+    if (eos) {
       // Already hit EOF
       return -1;
     }
@@ -246,7 +268,7 @@ public class BlockReader extends FSInput
 
     if (checksumSize > 0) {
 
-      // How many chunks left in our stream - this is a ceiling
+      // How many chunks left in our packet - this is a ceiling
       // since we may have a partial chunk at the end of the file
       int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
 
@@ -307,7 +329,7 @@ public class BlockReader extends FSInput
                               ", dataLen : " + dataLen);
       }
 
-      gotEOS = true;
+      eos = true;
     }
 
     if ( bytesToRead == 0 ) {
@@ -335,7 +357,7 @@ public class BlockReader extends FSInput
     // The total number of bytes that we need to transfer from the DN is
     // the amount that the user wants (bytesToRead), plus the padding at
     // the beginning in order to chunk-align. Note that the DN may elect
-    // to send more than this amount if the read ends mid-chunk.
+    // to send more than this amount if the read starts/ends mid-chunk.
     this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
 
     this.firstChunkOffset = firstChunkOffset;
@@ -364,6 +386,21 @@ public class BlockReader extends FSInput
                           len, bufferSize, verifyChecksum, "");
   }
 
+  /**
+   * Create a new BlockReader specifically to satisfy a read.
+   * This method also sends the OP_READ_BLOCK request.
+   *
+   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
+   * @param file  File location
+   * @param block  The block object
+   * @param blockToken  The block token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance, or null on error.
+   */
   public static BlockReader newBlockReader( Socket sock, String file,
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
@@ -423,6 +460,10 @@ public class BlockReader extends FSInput
   public synchronized void close() throws IOException {
     startOffset = -1;
     checksum = null;
+    if (dnSock != null) {
+      dnSock.close();
+    }
+
     // in will be closed when its Socket is closed.
   }
   
@@ -432,22 +473,43 @@ public class BlockReader extends FSInput
   public int readAll(byte[] buf, int offset, int len) throws IOException {
     return readFully(this, buf, offset, len);
   }
-  
-  /* When the reader reaches end of the read and there are no checksum
-   * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
-   * checksum was verified and there was no error.
-   */ 
-  void checksumOk(Socket sock) {
+
+  /**
+   * Take the socket used to talk to the DN.
+   */
+  public Socket takeSocket() {
+    assert hasSentStatusCode() :
+      "BlockReader shouldn't give back sockets mid-read";
+    Socket res = dnSock;
+    dnSock = null;
+    return res;
+  }
+
+  /**
+   * Whether the BlockReader has reached the end of its input stream
+   * and successfully sent a status code back to the datanode.
+   */
+  public boolean hasSentStatusCode() {
+    return sentStatusCode;
+  }
+
+  /**
+   * When the reader reaches end of the read, it sends a status response
+   * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+   * closing our connection (which we will re-open), but won't affect
+   * data correctness.
+   */
+  void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + sock;
     try {
       OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-      CHECKSUM_OK.writeOutputStream(out);
+      statusCode.writeOutputStream(out);
       out.flush();
+      sentStatusCode = true;
     } catch (IOException e) {
-      // its ok not to be able to send this.
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Could not write to datanode " + sock.getInetAddress() +
-                  ": " + e.getMessage());
-      }
+      // It's ok not to be able to send this. But something is probably wrong.
+      LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+               sock.getInetAddress() + ": " + e.getMessage());
     }
   }
   

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jun 10 00:11:32 2011
@@ -136,6 +136,8 @@ public class DFSClient implements FSCons
   final int hdfsTimeout;    // timeout value for a DFS operation.
   final LeaseRenewer leaserenewer;
 
+  final SocketCache socketCache;
+  
   /**
    * A map from file names to {@link DFSOutputStream} objects
    * that are currently being written by this client.
@@ -279,6 +281,10 @@ public class DFSClient implements FSCons
     defaultReplication = (short) 
       conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
                   DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+    
+    this.socketCache = new SocketCache(
+        conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+            DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT));
 
     if (nameNodeAddr != null && rpcNamenode == null) {
       this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
@@ -539,6 +545,23 @@ public class DFSClient implements FSCons
   }
 
   /**
+   * Recover a file's lease
+   * @param src a file's path
+   * @return true if the file is already closed
+   * @throws IOException
+   */
+  boolean recoverLease(String src) throws IOException {
+    checkOpen();
+
+    try {
+      return namenode.recoverLease(src, clientName);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(FileNotFoundException.class,
+                                     AccessControlException.class);
+    }
+  }
+
+  /**
    * Get block location info about file
    * 
    * getBlockLocations() returns a list of hostnames that store 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jun 10 00:11:32 2011
@@ -44,6 +44,8 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
+  public static final String  DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
+  public static final int     DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
   
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
@@ -83,6 +85,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
   public static final String  DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
   public static final boolean DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
+  public static final String  DFS_CLIENT_CACHED_CONN_RETRY_KEY = "dfs.client.cached.conn.retry";
+  public static final int     DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = 3;
   public static final String  DFS_NAMENODE_ACCESSTIME_PRECISION_KEY = "dfs.namenode.accesstime.precision";
   public static final long    DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT = 3600000;
   public static final String  DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY = "dfs.namenode.replication.considerLoad";
@@ -115,6 +119,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
   public static final String  DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
   public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
+  public static final String  DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
+  public static final int     DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
 
   //Delegation token related keys
   public static final String  DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Jun 10 00:11:32 2011
@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
@@ -56,8 +55,9 @@ import org.apache.hadoop.util.StringUtil
  ****************************************************************/
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream {
+  private final SocketCache socketCache;
+
   private final DFSClient dfsClient;
-  private Socket s = null;
   private boolean closed = false;
 
   private final String src;
@@ -92,7 +92,9 @@ public class DFSInputStream extends FSIn
   private int buffersize = 1;
   
   private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
-  
+
+  private int nCachedConnRetry;
+
   void addToDeadNodes(DatanodeInfo dnInfo) {
     deadNodes.put(dnInfo, dnInfo);
   }
@@ -103,9 +105,14 @@ public class DFSInputStream extends FSIn
     this.verifyChecksum = verifyChecksum;
     this.buffersize = buffersize;
     this.src = src;
+    this.socketCache = dfsClient.socketCache;
     prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
         10 * dfsClient.defaultBlockSize);
-    timeWindow = this.dfsClient.conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
+    timeWindow = this.dfsClient.conf.getInt(
+        DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
+    nCachedConnRetry = this.dfsClient.conf.getInt(
+        DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
+        DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
     openInfo();
   }
 
@@ -371,15 +378,11 @@ public class DFSInputStream extends FSIn
       throw new IOException("Attempted to read past end of file");
     }
 
-    if ( blockReader != null ) {
-      blockReader.close(); 
+    // Will be getting a new BlockReader.
+    if (blockReader != null) {
+      closeBlockReader(blockReader);
       blockReader = null;
     }
-    
-    if (s != null) {
-      s.close();
-      s = null;
-    }
 
     //
     // Connect to best DataNode for desired Block, with potential offset
@@ -400,14 +403,12 @@ public class DFSInputStream extends FSIn
       InetSocketAddress targetAddr = retval.addr;
 
       try {
-        s = dfsClient.socketFactory.createSocket();
-        NetUtils.connect(s, targetAddr, dfsClient.socketTimeout);
-        s.setSoTimeout(dfsClient.socketTimeout);
         ExtendedBlock blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
         
-        blockReader = BlockReader.newBlockReader(s, src, blk,
-            accessToken, 
+        blockReader = getBlockReader(
+            targetAddr, src, blk,
+            accessToken,
             offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
             buffersize, verifyChecksum, dfsClient.clientName);
         return chosenNode;
@@ -437,13 +438,6 @@ public class DFSInputStream extends FSIn
           // Put chosen node into dead list, continue
           addToDeadNodes(chosenNode);
         }
-        if (s != null) {
-          try {
-            s.close();
-          } catch (IOException iex) {
-          }                        
-        }
-        s = null;
       }
     }
   }
@@ -457,16 +451,11 @@ public class DFSInputStream extends FSIn
       return;
     }
     dfsClient.checkOpen();
-    
-    if ( blockReader != null ) {
-      blockReader.close();
+
+    if (blockReader != null) {
+      closeBlockReader(blockReader);
       blockReader = null;
     }
-    
-    if (s != null) {
-      s.close();
-      s = null;
-    }
     super.close();
     closed = true;
   }
@@ -479,7 +468,7 @@ public class DFSInputStream extends FSIn
 
   /* This is a used by regular read() and handles ChecksumExceptions.
    * name readBuffer() is chosen to imply similarity to readBuffer() in
-   * ChecksuFileSystem
+   * ChecksumFileSystem
    */ 
   private synchronized int readBuffer(byte buf[], int off, int len,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
@@ -659,7 +648,6 @@ public class DFSInputStream extends FSIn
     //
     // Connect to best DataNode for desired Block, with potential offset
     //
-    Socket dn = null;
     int refetchToken = 1; // only need to get a new access token once
     
     while (true) {
@@ -673,18 +661,15 @@ public class DFSInputStream extends FSIn
       BlockReader reader = null;
           
       try {
-        dn = dfsClient.socketFactory.createSocket();
-        NetUtils.connect(dn, targetAddr, dfsClient.socketTimeout);
-        dn.setSoTimeout(dfsClient.socketTimeout);
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
             
         int len = (int) (end - start + 1);
-            
-        reader = BlockReader.newBlockReader(dn, src, 
-                                            block.getBlock(),
-                                            blockToken,
-                                            start, len, buffersize, 
-                                            verifyChecksum, dfsClient.clientName);
+
+        reader = getBlockReader(targetAddr, src,
+                                block.getBlock(),
+                                blockToken,
+                                start, len, buffersize,
+                                verifyChecksum, dfsClient.clientName);
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -713,8 +698,9 @@ public class DFSInputStream extends FSIn
           }
         }
       } finally {
-        IOUtils.closeStream(reader);
-        IOUtils.closeSocket(dn);
+        if (reader != null) {
+          closeBlockReader(reader);
+        }
       }
       // Put chosen node into dead list, continue
       addToDeadNodes(chosenNode);
@@ -722,6 +708,95 @@ public class DFSInputStream extends FSIn
   }
 
   /**
+   * Close the given BlockReader and cache its socket.
+   */
+  private void closeBlockReader(BlockReader reader) throws IOException {
+    if (reader.hasSentStatusCode()) {
+      Socket oldSock = reader.takeSocket();
+      socketCache.put(oldSock);
+    }
+    reader.close();
+  }
+
+  /**
+   * Retrieve a BlockReader suitable for reading.
+   * This method will reuse the cached connection to the DN if appropriate.
+   * Otherwise, it will create a new connection.
+   *
+   * @param dnAddr  Address of the datanode
+   * @param file  File location
+   * @param block  The Block object
+   * @param blockToken  The access token for security
+   * @param startOffset  The read offset, relative to block head
+   * @param len  The number of bytes to read
+   * @param bufferSize  The IO buffer size (not the client buffer size)
+   * @param verifyChecksum  Whether to verify checksum
+   * @param clientName  Client name
+   * @return New BlockReader instance
+   */
+  protected BlockReader getBlockReader(InetSocketAddress dnAddr,
+                                       String file,
+                                       ExtendedBlock block,
+                                       Token<BlockTokenIdentifier> blockToken,
+                                       long startOffset,
+                                       long len,
+                                       int bufferSize,
+                                       boolean verifyChecksum,
+                                       String clientName)
+      throws IOException {
+    IOException err = null;
+    boolean fromCache = true;
+
+    // Allow retry since there is no way of knowing whether the cached socket
+    // is good until we actually use it.
+    for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
+      Socket sock = socketCache.get(dnAddr);
+      if (sock == null) {
+        fromCache = false;
+
+        sock = dfsClient.socketFactory.createSocket();
+        
+        // TCP_NODELAY is crucial here because of bad interactions between
+        // Nagle's Algorithm and Delayed ACKs. With connection keepalive
+        // between the client and DN, the conversation looks like:
+        //   1. Client -> DN: Read block X
+        //   2. DN -> Client: data for block X
+        //   3. Client -> DN: Status OK (successful read)
+        //   4. Client -> DN: Read block Y
+        // The fact that step #3 and #4 are both in the client->DN direction
+        // triggers Nagling. If the DN is using delayed ACKs, this results
+        // in a delay of 40ms or more.
+        //
+        // TCP_NODELAY disables nagling and thus avoids this performance
+        // disaster.
+        sock.setTcpNoDelay(true);
+
+        NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout);
+        sock.setSoTimeout(dfsClient.socketTimeout);
+      }
+
+      try {
+        // The OP_READ_BLOCK request is sent as we make the BlockReader
+        BlockReader reader =
+            BlockReader.newBlockReader(sock, file, block,
+                                       blockToken,
+                                       startOffset, len,
+                                       bufferSize, verifyChecksum,
+                                       clientName);
+        return reader;
+      } catch (IOException ex) {
+        // Our socket is no good.
+        DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
+        sock.close();
+        err = ex;
+      }
+    }
+
+    throw err;
+  }
+
+
+  /**
    * Read bytes starting from the specified position.
    * 
    * @param position start read from this position

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Jun 10 00:11:32 2011
@@ -218,6 +218,17 @@ public class DistributedFileSystem exten
     this.verifyChecksum = verifyChecksum;
   }
 
+  /** 
+   * Start the lease recovery of a file
+   *
+   * @param f a file
+   * @return true if the file is already closed
+   * @throws IOException if an error occurs
+   */
+  public boolean recoverLease(Path f) throws IOException {
+    return dfs.recoverLease(getPathName(f));
+  }
+
   @SuppressWarnings("deprecation")
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Jun 10 00:11:32 2011
@@ -537,6 +537,17 @@ public interface ClientProtocol extends 
   public void renewLease(String clientName) throws AccessControlException,
       IOException;
 
+  /**
+   * Start lease recovery.
+   * Lightweight NameNode operation to trigger lease recovery
+   * 
+   * @param src path of the file to start lease recovery
+   * @param clientName name of the current client
+   * @return true if the file is already closed
+   * @throws IOException
+   */
+  public boolean recoverLease(String src, String clientName) throws IOException;
+
   public int GET_STATS_CAPACITY_IDX = 0;
   public int GET_STATS_USED_IDX = 1;
   public int GET_STATS_REMAINING_IDX = 2;

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Fri Jun 10 00:11:32 2011
@@ -83,6 +83,7 @@ public interface HdfsConstants {
   public static int READ_TIMEOUT_EXTENSION = 5 * 1000;
   public static int WRITE_TIMEOUT = 8 * 60 * 1000;
   public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
+  public static int DN_KEEPALIVE_TIMEOUT = 5 * 1000;
 
   /**
    * Defines the NameNode role.

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Jun 10 00:11:32 2011
@@ -69,7 +69,8 @@ class BlockSender implements java.io.Clo
   private long seqno; // sequence number of packet
 
   private boolean transferToAllowed = true;
-  private boolean blockReadFully; //set when the whole block is read
+  // set once entire requested byte range has been sent to the client
+  private boolean sentEntireByteRange;
   private boolean verifyChecksum; //if true, check is verified while reading
   private DataTransferThrottler throttler;
   private final String clientTraceFmt; // format of client trace log message
@@ -493,6 +494,8 @@ class BlockSender implements java.io.Clo
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);
       }
+
+      sentEntireByteRange = true;
     } finally {
       if (clientTraceFmt != null) {
         final long endTime = System.nanoTime();
@@ -501,12 +504,10 @@ class BlockSender implements java.io.Clo
       close();
     }
 
-    blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
-
     return totalRead;
   }
   
-  boolean isBlockReadFully() {
-    return blockReadFully;
+  boolean didSendEntireByteRange() {
+    return sentEntireByteRange;
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jun 10 00:11:32 2011
@@ -912,27 +912,30 @@ public class DataNode extends Configured
      * @throws IOException
      */
     DatanodeCommand blockReport() throws IOException {
-      // send block report
+      // send block report if timer has expired.
       DatanodeCommand cmd = null;
       long startTime = now();
       if (startTime - lastBlockReport > blockReportInterval) {
-        //
-        // Send latest block report if timer has expired.
-        // Get back a list of local block(s) that are obsolete
-        // and can be safely GC'ed.
-        //
-        long brStartTime = now();
+
+        // Create block report
+        long brCreateStartTime = now();
         BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
+
+        // Send block report
+        long brSendStartTime = now();
         cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
             .getBlockListAsLongs());
-        long brTime = now() - brStartTime;
-        metrics.addBlockReport(brTime);
-        LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
-            " blocks got processed in " + brTime + " msecs");
-        //
+
+        // Log the block report processing stats from Datanode perspective
+        long brSendCost = now() - brSendStartTime;
+        long brCreateCost = brSendStartTime - brCreateStartTime;
+        metrics.addBlockReport(brSendCost);
+        LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+            + " blocks took " + brCreateCost + " msec to generate and "
+            + brSendCost + " msecs for RPC and NN processing");
+
         // If we have sent the first block report, then wait a random
         // time before we start the periodic block reports.
-        //
         if (resetBlockReportTime) {
           lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
           resetBlockReportTime = false;
@@ -1831,15 +1834,21 @@ public class DataNode extends Configured
     A "PACKET" is defined further below.
     
     The client reads data until it receives a packet with 
-    "LastPacketInBlock" set to true or with a zero length. If there is 
-    no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
-    
-    Client optional response at the end of data transmission of any length:
-      +------------------------------+
-      | 2 byte OP_STATUS_CHECKSUM_OK |
-      +------------------------------+
-    The DataNode always checks OP_STATUS_CHECKSUM_OK. It will close the
-    client connection if it is absent.
+    "LastPacketInBlock" set to true or with a zero length. It then replies
+    to DataNode with one of the status codes:
+    - CHECKSUM_OK:    All the chunk checksums have been verified
+    - SUCCESS:        Data received; checksums not verified
+    - ERROR_CHECKSUM: (Currently not used) Detected invalid checksums
+
+      +---------------+
+      | 2 byte Status |
+      +---------------+
+    
+    The DataNode expects all well behaved clients to send the 2 byte
+    status code. And if the the client doesn't, the DN will close the
+    connection. So the status code is optional in the sense that it
+    does not affect the correctness of the data. (And the client can
+    always reconnect.)
     
     PACKET : Contains a packet header, checksum and data. Amount of data
     ======== carried is set by BUFFER_SIZE.

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jun 10 00:11:32 2011
@@ -27,14 +27,18 @@ import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -71,6 +75,7 @@ class DataXceiver extends DataTransferPr
   private final DataNode datanode;
   private final DataXceiverServer dataXceiverServer;
 
+  private int socketKeepaliveTimeout;
   private long opStartTime; //the start time of receiving an Op
   
   public DataXceiver(Socket s, DataNode datanode, 
@@ -83,6 +88,10 @@ class DataXceiver extends DataTransferPr
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
 
+    socketKeepaliveTimeout = datanode.getConf().getInt(
+        DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+        DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Number of active connections is: "
           + datanode.getXceiverCount());
@@ -113,24 +122,60 @@ class DataXceiver extends DataTransferPr
     updateCurrentThreadName("Waiting for operation");
 
     DataInputStream in=null; 
+    int opsProcessed = 0;
     try {
       in = new DataInputStream(
           new BufferedInputStream(NetUtils.getInputStream(s), 
                                   SMALL_BUFFER_SIZE));
-      final DataTransferProtocol.Op op = readOp(in);
+      int stdTimeout = s.getSoTimeout();
 
-      // Make sure the xciver count is not exceeded
-      int curXceiverCount = datanode.getXceiverCount();
-      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
-        throw new IOException("xceiverCount " + curXceiverCount
-                              + " exceeds the limit of concurrent xcievers "
-                              + dataXceiverServer.maxXceiverCount);
-      }
+      // We process requests in a loop, and stay around for a short timeout.
+      // This optimistic behaviour allows the other end to reuse connections.
+      // Setting keepalive timeout to 0 disable this behavior.
+      do {
+        DataTransferProtocol.Op op;
+        try {
+          if (opsProcessed != 0) {
+            assert socketKeepaliveTimeout > 0;
+            s.setSoTimeout(socketKeepaliveTimeout);
+          }
+          op = readOp(in);
+        } catch (InterruptedIOException ignored) {
+          // Time out while we wait for client rpc
+          break;
+        } catch (IOException err) {
+          // Since we optimistically expect the next op, it's quite normal to get EOF here.
+          if (opsProcessed > 0 &&
+              (err instanceof EOFException || err instanceof ClosedChannelException)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
+            }
+          } else {
+            throw err;
+          }
+          break;
+        }
+
+        // restore normal timeout
+        if (opsProcessed != 0) {
+          s.setSoTimeout(stdTimeout);
+        }
 
-      opStartTime = now();
-      processOp(op, in);
+        // Make sure the xceiver count is not exceeded
+        int curXceiverCount = datanode.getXceiverCount();
+        if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
+          throw new IOException("xceiverCount " + curXceiverCount
+                                + " exceeds the limit of concurrent xcievers "
+                                + dataXceiverServer.maxXceiverCount);
+        }
+
+        opStartTime = now();
+        processOp(op, in);
+        ++opsProcessed;
+      } while (s.isConnected() && socketKeepaliveTimeout > 0);
     } catch (Throwable t) {
-      LOG.error(datanode.getMachineName() + ":DataXceiver",t);
+      LOG.error(datanode.getMachineName() + ":DataXceiver, at " +
+        s.toString(), t);
     } finally {
       if (LOG.isDebugEnabled()) {
         LOG.debug(datanode.getMachineName() + ":Number of active connections is: "
@@ -176,18 +221,36 @@ class DataXceiver extends DataTransferPr
         blockSender = new BlockSender(block, startOffset, length,
             true, true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
-        ERROR.write(out);
+        sendResponse(s, ERROR, datanode.socketWriteTimeout);
         throw e;
       }
 
       SUCCESS.write(out); // send op status
       long read = blockSender.sendBlock(out, baseStream, null); // send data
-      
+
+      if (blockSender.didSendEntireByteRange()) {
+        // If we sent the entire range, then we should expect the client
+        // to respond with a Status enum.
+        try {
+          DataTransferProtocol.Status stat = DataTransferProtocol.Status.read(in);
+          if (stat == null) {
+            LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
+                     "code after reading. Will close connection.");
+            IOUtils.closeStream(out);
+          }
+        } catch (IOException ioe) {
+          LOG.debug("Error reading client status response. Will close connection.", ioe);
+          IOUtils.closeStream(out);
+        }
+      } else {
+        IOUtils.closeStream(out);
+      }
       datanode.metrics.incrBytesRead((int) read);
       datanode.metrics.incrBlocksRead();
     } catch ( SocketException ignored ) {
       // Its ok for remote side to close the connection anytime.
       datanode.metrics.incrBlocksRead();
+      IOUtils.closeStream(out);
     } catch ( IOException ioe ) {
       /* What exactly should we do here?
        * Earlier version shutdown() datanode if there is disk error.
@@ -198,7 +261,6 @@ class DataXceiver extends DataTransferPr
                 StringUtils.stringifyException(ioe) );
       throw ioe;
     } finally {
-      IOUtils.closeStream(out);
       IOUtils.closeStream(blockSender);
     }
 
@@ -690,12 +752,8 @@ class DataXceiver extends DataTransferPr
       long timeout) throws IOException {
     DataOutputStream reply = 
       new DataOutputStream(NetUtils.getOutputStream(s, timeout));
-    try {
-      opStatus.write(reply);
-      reply.flush();
-    } finally {
-      IOUtils.closeStream(reply);
-    }
+    opStatus.write(reply);
+    reply.flush();
   }
 
   private void checkAccess(DataOutputStream out, final boolean reply, 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Jun 10 00:11:32 2011
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.BufferedInputStream;
-import java.io.DataInput;
 import java.io.DataInputStream;
-import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -29,27 +27,15 @@ import java.util.Arrays;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 public class FSEditLogLoader {
   private final FSNamesystem fsNamesys;
@@ -81,7 +67,7 @@ public class FSEditLogLoader {
    */
   int readLogVersion(DataInputStream in) throws IOException {
     int logVersion = 0;
-    // Read log file version. Could be missing. 
+    // Read log file version. Could be missing.
     in.mark(4);
     // If edits log is greater than 2G, available method will return negative
     // numbers, so we avoid having to call available
@@ -97,7 +83,7 @@ public class FSEditLogLoader {
       if (logVersion < FSConstants.LAYOUT_VERSION) // future version
         throw new IOException(
             "Unexpected version of the file system log file: "
-            + logVersion + ". Current version = " 
+            + logVersion + ". Current version = "
             + FSConstants.LAYOUT_VERSION + ".");
     }
     assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
@@ -110,7 +96,7 @@ public class FSEditLogLoader {
   throws IOException {
     BufferedInputStream bin = new BufferedInputStream(edits);
     DataInputStream in = new DataInputStream(bin);
-    
+
     int numEdits = 0;
     int logVersion = 0;
 
@@ -139,9 +125,7 @@ public class FSEditLogLoader {
       throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
-    String clientName = null;
-    String clientMachine = null;
-    String path = null;
+
     int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
         numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
@@ -161,26 +145,10 @@ public class FSEditLogLoader {
       long txId = expectedStartingTxId - 1;
 
       try {
-        while (true) {
-          long timestamp = 0;
-          long mtime = 0;
-          long atime = 0;
-          long blockSize = 0;
-          FSEditLogOpCodes opCode;
-          try {
-            if (checksum != null) {
-              checksum.reset();
-            }
-            in.mark(1);
-            byte opCodeByte = in.readByte();
-            opCode = FSEditLogOpCodes.fromByte(opCodeByte);
-            if (opCode == FSEditLogOpCodes.OP_INVALID) {
-              in.reset(); // reset back to end of file if somebody reads it again
-              break; // no more transactions
-            }
-          } catch (EOFException e) {
-            break; // no more transactions
-          }
+        FSEditLogOp.Reader reader = new FSEditLogOp.Reader(in, logVersion,
+                                                           checksum);
+        FSEditLogOp op;
+        while ((op = reader.readOp()) != null) {
           recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
               tracker.getPos();
           if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
@@ -194,34 +162,34 @@ public class FSEditLogLoader {
           }
 
           numEdits++;
-          switch (opCode) {
+          switch (op.opCode) {
           case OP_ADD:
           case OP_CLOSE: {
+            AddCloseOp addCloseOp = (AddCloseOp)op;
+
             // versions > 0 support per file replication
             // get name and replication
-            int length = in.readInt();
-            if (-7 == logVersion && length != 3||
-                -17 < logVersion && logVersion < -7 && length != 4 ||
-                logVersion <= -17 && length != 5) {
-                throw new IOException("Incorrect data format."  +
-                                      " logVersion is " + logVersion +
-                                      " but writables.length is " +
-                                      length + ". ");
-            }
-            path = FSImageSerialization.readString(in);
-            short replication = fsNamesys.adjustReplication(readShort(in));
-            mtime = readLong(in);
-            if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-              atime = readLong(in);
+            short replication
+              = fsNamesys.adjustReplication(addCloseOp.replication);
+
+            long blockSize = addCloseOp.blockSize;
+            BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
+            for (int i = 0; i < addCloseOp.blocks.length; i++) {
+              if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD
+                 && i == addCloseOp.blocks.length-1) {
+                blocks[i] = new BlockInfoUnderConstruction(addCloseOp.blocks[i],
+                                                           replication);
+              } else {
+                blocks[i] = new BlockInfo(addCloseOp.blocks[i], replication);
+              }
             }
-            if (logVersion < -7) {
-              blockSize = readLong(in);
+
+            PermissionStatus permissions = fsNamesys.getUpgradePermission();
+            if (addCloseOp.permissions != null) {
+              permissions = addCloseOp.permissions;
             }
-            // get blocks
-            boolean isFileUnderConstruction = (opCode == FSEditLogOpCodes.OP_ADD);
-            BlockInfo blocks[] = 
-              readBlocks(in, logVersion, isFileUnderConstruction, replication);
-  
+
+
             // Older versions of HDFS does not store the block size in inode.
             // If the file has more than one block, use the size of the
             // first block as the blocksize. Otherwise use the default
@@ -234,41 +202,25 @@ public class FSEditLogLoader {
                 blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
               }
             }
-             
-            PermissionStatus permissions = fsNamesys.getUpgradePermission();
-            if (logVersion <= -11) {
-              permissions = PermissionStatus.read(in);
-            }
-  
-            // clientname, clientMachine and block locations of last block.
-            if (opCode == FSEditLogOpCodes.OP_ADD && logVersion <= -12) {
-              clientName = FSImageSerialization.readString(in);
-              clientMachine = FSImageSerialization.readString(in);
-              if (-13 <= logVersion) {
-                readDatanodeDescriptorArray(in);
-              }
-            } else {
-              clientName = "";
-              clientMachine = "";
-            }
-  
+
+
             // The open lease transaction re-creates a file if necessary.
             // Delete the file if it already exists.
             if (FSNamesystem.LOG.isDebugEnabled()) {
-              FSNamesystem.LOG.debug(opCode + ": " + path + 
-                                     " numblocks : " + blocks.length +
-                                     " clientHolder " +  clientName +
-                                     " clientMachine " + clientMachine);
+              FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+                  " numblocks : " + blocks.length +
+                  " clientHolder " + addCloseOp.clientName +
+                  " clientMachine " + addCloseOp.clientMachine);
             }
-  
-            fsDir.unprotectedDelete(path, mtime);
-  
+
+            fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
+
             // add to the file tree
             INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
-                                                      path, permissions,
-                                                      blocks, replication, 
-                                                      mtime, atime, blockSize);
-            if (isFileUnderConstruction) {
+                addCloseOp.path, permissions,
+                blocks, replication,
+                addCloseOp.mtime, addCloseOp.atime, blockSize);
+            if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
               numOpAdd++;
               //
               // Replace current node with a INodeUnderConstruction.
@@ -276,252 +228,199 @@ public class FSEditLogLoader {
               //
               INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
                                         node.getLocalNameBytes(),
-                                        node.getReplication(), 
+                                        node.getReplication(),
                                         node.getModificationTime(),
                                         node.getPreferredBlockSize(),
                                         node.getBlocks(),
                                         node.getPermissionStatus(),
-                                        clientName, 
-                                        clientMachine, 
+                                        addCloseOp.clientName,
+                                        addCloseOp.clientMachine,
                                         null);
-              fsDir.replaceNode(path, node, cons);
-              fsNamesys.leaseManager.addLease(cons.getClientName(), path);
+              fsDir.replaceNode(addCloseOp.path, node, cons);
+              fsNamesys.leaseManager.addLease(cons.getClientName(),
+                                              addCloseOp.path);
             }
             break;
-          } 
+          }
           case OP_SET_REPLICATION: {
             numOpSetRepl++;
-            path = FSImageSerialization.readString(in);
-            short replication = fsNamesys.adjustReplication(readShort(in));
-            fsDir.unprotectedSetReplication(path, replication, null);
+            SetReplicationOp setReplicationOp = (SetReplicationOp)op;
+            short replication
+              = fsNamesys.adjustReplication(setReplicationOp.replication);
+            fsDir.unprotectedSetReplication(setReplicationOp.path,
+                                            replication, null);
             break;
-          } 
+          }
           case OP_CONCAT_DELETE: {
             numOpConcatDelete++;
-            int length = in.readInt();
-            if (length < 3) { // trg, srcs.., timestam
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
-            }
-            String trg = FSImageSerialization.readString(in);
-            int srcSize = length - 1 - 1; //trg and timestamp
-            String [] srcs = new String [srcSize];
-            for(int i=0; i<srcSize;i++) {
-              srcs[i]= FSImageSerialization.readString(in);
-            }
-            timestamp = readLong(in);
-            fsDir.unprotectedConcat(trg, srcs);
+
+            ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
+            fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs);
             break;
           }
           case OP_RENAME_OLD: {
             numOpRenameOld++;
-            int length = in.readInt();
-            if (length != 3) {
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
-            }
-            String s = FSImageSerialization.readString(in);
-            String d = FSImageSerialization.readString(in);
-            timestamp = readLong(in);
-            HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-            fsDir.unprotectedRenameTo(s, d, timestamp);
-            fsNamesys.changeLease(s, d, dinfo);
+            RenameOldOp renameOp = (RenameOldOp)op;
+            HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+            fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+                                      renameOp.timestamp);
+            fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
             break;
           }
           case OP_DELETE: {
             numOpDelete++;
-            int length = in.readInt();
-            if (length != 2) {
-              throw new IOException("Incorrect data format. " 
-                                    + "delete operation.");
-            }
-            path = FSImageSerialization.readString(in);
-            timestamp = readLong(in);
-            fsDir.unprotectedDelete(path, timestamp);
+
+            DeleteOp deleteOp = (DeleteOp)op;
+            fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
             break;
           }
           case OP_MKDIR: {
             numOpMkDir++;
+            MkdirOp mkdirOp = (MkdirOp)op;
             PermissionStatus permissions = fsNamesys.getUpgradePermission();
-            int length = in.readInt();
-            if (-17 < logVersion && length != 2 ||
-                logVersion <= -17 && length != 3) {
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
+            if (mkdirOp.permissions != null) {
+              permissions = mkdirOp.permissions;
             }
-            path = FSImageSerialization.readString(in);
-            timestamp = readLong(in);
-  
-            // The disk format stores atimes for directories as well.
-            // However, currently this is not being updated/used because of
-            // performance reasons.
-            if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
-              atime = readLong(in);
-            }
-  
-            if (logVersion <= -11) {
-              permissions = PermissionStatus.read(in);
-            }
-            fsDir.unprotectedMkdir(path, permissions, timestamp);
+
+            fsDir.unprotectedMkdir(mkdirOp.path, permissions,
+                                   mkdirOp.timestamp);
             break;
           }
           case OP_SET_GENSTAMP: {
             numOpSetGenStamp++;
-            long lw = in.readLong();
-            fsNamesys.setGenerationStamp(lw);
-            break;
-          } 
-          case OP_DATANODE_ADD: {
-            numOpOther++;
-            //Datanodes are not persistent any more.
-            FSImageSerialization.DatanodeImage.skipOne(in);
-            break;
-          }
-          case OP_DATANODE_REMOVE: {
-            numOpOther++;
-            DatanodeID nodeID = new DatanodeID();
-            nodeID.readFields(in);
-            //Datanodes are not persistent any more.
+            SetGenstampOp setGenstampOp = (SetGenstampOp)op;
+            fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
             break;
           }
           case OP_SET_PERMISSIONS: {
             numOpSetPerm++;
-            fsDir.unprotectedSetPermission(
-                FSImageSerialization.readString(in), FsPermission.read(in));
+
+            SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
+            fsDir.unprotectedSetPermission(setPermissionsOp.src,
+                                           setPermissionsOp.permissions);
             break;
           }
           case OP_SET_OWNER: {
             numOpSetOwner++;
-            if (logVersion > -11)
-              throw new IOException("Unexpected opCode " + opCode
-                                    + " for version " + logVersion);
-            fsDir.unprotectedSetOwner(FSImageSerialization.readString(in),
-                FSImageSerialization.readString_EmptyAsNull(in),
-                FSImageSerialization.readString_EmptyAsNull(in));
+
+            SetOwnerOp setOwnerOp = (SetOwnerOp)op;
+            fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username,
+                                      setOwnerOp.groupname);
             break;
           }
           case OP_SET_NS_QUOTA: {
-            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in), 
-                                      readLongWritable(in), 
+            SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
+            fsDir.unprotectedSetQuota(setNSQuotaOp.src,
+                                      setNSQuotaOp.nsQuota,
                                       FSConstants.QUOTA_DONT_SET);
             break;
           }
           case OP_CLEAR_NS_QUOTA: {
-            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
+            ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
+            fsDir.unprotectedSetQuota(clearNSQuotaOp.src,
                                       FSConstants.QUOTA_RESET,
                                       FSConstants.QUOTA_DONT_SET);
             break;
           }
-  
+
           case OP_SET_QUOTA:
-            fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
-                                      readLongWritable(in),
-                                      readLongWritable(in));
-                                        
+            SetQuotaOp setQuotaOp = (SetQuotaOp)op;
+            fsDir.unprotectedSetQuota(setQuotaOp.src,
+                                      setQuotaOp.nsQuota,
+                                      setQuotaOp.dsQuota);
             break;
-  
+
           case OP_TIMES: {
             numOpTimes++;
-            int length = in.readInt();
-            if (length != 3) {
-              throw new IOException("Incorrect data format. " 
-                                    + "times operation.");
-            }
-            path = FSImageSerialization.readString(in);
-            mtime = readLong(in);
-            atime = readLong(in);
-            fsDir.unprotectedSetTimes(path, mtime, atime, true);
+            TimesOp timesOp = (TimesOp)op;
+
+            fsDir.unprotectedSetTimes(timesOp.path,
+                                      timesOp.mtime,
+                                      timesOp.atime, true);
             break;
           }
           case OP_SYMLINK: {
             numOpSymlink++;
-            int length = in.readInt();
-            if (length != 4) {
-              throw new IOException("Incorrect data format. " 
-                                    + "symlink operation.");
-            }
-            path = FSImageSerialization.readString(in);
-            String value = FSImageSerialization.readString(in);
-            mtime = readLong(in);
-            atime = readLong(in);
-            PermissionStatus perm = PermissionStatus.read(in);
-            fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
+
+            SymlinkOp symlinkOp = (SymlinkOp)op;
+            fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
+                                     symlinkOp.mtime, symlinkOp.atime,
+                                     symlinkOp.permissionStatus);
             break;
           }
           case OP_RENAME: {
             numOpRename++;
-            int length = in.readInt();
-            if (length != 3) {
-              throw new IOException("Incorrect data format. " 
-                                    + "Mkdir operation.");
-            }
-            String s = FSImageSerialization.readString(in);
-            String d = FSImageSerialization.readString(in);
-            timestamp = readLong(in);
-            Rename[] options = readRenameOptions(in);
-            HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
-            fsDir.unprotectedRenameTo(s, d, timestamp, options);
-            fsNamesys.changeLease(s, d, dinfo);
+            RenameOp renameOp = (RenameOp)op;
+
+            HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+            fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+                                      renameOp.timestamp, renameOp.options);
+            fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
             break;
           }
           case OP_GET_DELEGATION_TOKEN: {
             numOpGetDelegationToken++;
-            DelegationTokenIdentifier delegationTokenId = 
-                new DelegationTokenIdentifier();
-            delegationTokenId.readFields(in);
-            long expiryTime = readLong(in);
+            GetDelegationTokenOp getDelegationTokenOp
+              = (GetDelegationTokenOp)op;
+
             fsNamesys.getDelegationTokenSecretManager()
-                .addPersistedDelegationToken(delegationTokenId, expiryTime);
+              .addPersistedDelegationToken(getDelegationTokenOp.token,
+                                           getDelegationTokenOp.expiryTime);
             break;
           }
           case OP_RENEW_DELEGATION_TOKEN: {
             numOpRenewDelegationToken++;
-            DelegationTokenIdentifier delegationTokenId = 
-                new DelegationTokenIdentifier();
-            delegationTokenId.readFields(in);
-            long expiryTime = readLong(in);
+
+            RenewDelegationTokenOp renewDelegationTokenOp
+              = (RenewDelegationTokenOp)op;
             fsNamesys.getDelegationTokenSecretManager()
-                .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
+              .updatePersistedTokenRenewal(renewDelegationTokenOp.token,
+                                           renewDelegationTokenOp.expiryTime);
             break;
           }
           case OP_CANCEL_DELEGATION_TOKEN: {
             numOpCancelDelegationToken++;
-            DelegationTokenIdentifier delegationTokenId = 
-                new DelegationTokenIdentifier();
-            delegationTokenId.readFields(in);
+
+            CancelDelegationTokenOp cancelDelegationTokenOp
+              = (CancelDelegationTokenOp)op;
             fsNamesys.getDelegationTokenSecretManager()
-                .updatePersistedTokenCancellation(delegationTokenId);
+                .updatePersistedTokenCancellation(
+                    cancelDelegationTokenOp.token);
             break;
           }
           case OP_UPDATE_MASTER_KEY: {
             numOpUpdateMasterKey++;
-            DelegationKey delegationKey = new DelegationKey();
-            delegationKey.readFields(in);
-            fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
-                delegationKey);
+            UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
+            fsNamesys.getDelegationTokenSecretManager()
+              .updatePersistedMasterKey(updateMasterKeyOp.key);
             break;
           }
           case OP_REASSIGN_LEASE: {
             numOpReassignLease++;
-            String leaseHolder = FSImageSerialization.readString(in);
-            path = FSImageSerialization.readString(in);
-            String newHolder = FSImageSerialization.readString(in);
-            Lease lease = fsNamesys.leaseManager.getLease(leaseHolder);
+            ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
+
+            Lease lease = fsNamesys.leaseManager.getLease(
+                reassignLeaseOp.leaseHolder);
             INodeFileUnderConstruction pendingFile =
-                (INodeFileUnderConstruction) fsDir.getFileINode(path);
-            fsNamesys.reassignLeaseInternal(lease, path, newHolder, pendingFile);
+                (INodeFileUnderConstruction) fsDir.getFileINode(
+                    reassignLeaseOp.path);
+            fsNamesys.reassignLeaseInternal(lease,
+                reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
             break;
           }
           case OP_START_LOG_SEGMENT:
           case OP_END_LOG_SEGMENT: {
             // no data in here currently.
+            numOpOther++;
             break;
           }
-          default: {
-            throw new IOException("Never seen opCode " + opCode);
-          }
+          case OP_DATANODE_ADD:
+          case OP_DATANODE_REMOVE:
+            numOpOther++;
+            break;
+          default:
+            throw new IOException("Invalid operation read " + op.opCode);
           }
-          validateChecksum(in, checksum, numEdits);
         }
       } catch (IOException ex) {
         check203UpgradeFailure(logVersion, ex);
@@ -569,126 +468,6 @@ public class FSEditLogLoader {
   }
 
   /**
-   * Validate a transaction's checksum
-   */
-  private static void validateChecksum(
-      DataInputStream in, Checksum checksum, int tid)
-  throws IOException {
-    if (checksum != null) {
-      int calculatedChecksum = (int)checksum.getValue();
-      int readChecksum = in.readInt(); // read in checksum
-      if (readChecksum != calculatedChecksum) {
-        throw new ChecksumException(
-            "Transaction " + tid + " is corrupt. Calculated checksum is " +
-            calculatedChecksum + " but read checksum " + readChecksum, tid);
-      }
-    }
-  }
-
-  /**
-   * A class to read in blocks stored in the old format. The only two
-   * fields in the block were blockid and length.
-   */
-  static class BlockTwo implements Writable {
-    long blkid;
-    long len;
-
-    static {                                      // register a ctor
-      WritableFactories.setFactory
-        (BlockTwo.class,
-         new WritableFactory() {
-           public Writable newInstance() { return new BlockTwo(); }
-         });
-    }
-
-
-    BlockTwo() {
-      blkid = 0;
-      len = 0;
-    }
-    /////////////////////////////////////
-    // Writable
-    /////////////////////////////////////
-    public void write(DataOutput out) throws IOException {
-      out.writeLong(blkid);
-      out.writeLong(len);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      this.blkid = in.readLong();
-      this.len = in.readLong();
-    }
-  }
-
-  /** This method is defined for compatibility reason. */
-  static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
-      ) throws IOException {
-    DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
-    for (int i = 0; i < locations.length; i++) {
-      locations[i] = new DatanodeDescriptor();
-      locations[i].readFieldsFromFSEditLog(in);
-    }
-    return locations;
-  }
-
-  static private short readShort(DataInputStream in) throws IOException {
-    return Short.parseShort(FSImageSerialization.readString(in));
-  }
-
-  static private long readLong(DataInputStream in) throws IOException {
-    return Long.parseLong(FSImageSerialization.readString(in));
-  }
-  
-  // a place holder for reading a long
-  private static final LongWritable longWritable = new LongWritable();
-
-  /** Read an integer from an input stream */
-  private static long readLongWritable(DataInputStream in) throws IOException {
-    synchronized (longWritable) {
-      longWritable.readFields(in);
-      return longWritable.get();
-    }
-  }
-  
-  static Rename[] readRenameOptions(DataInputStream in) throws IOException {
-    BytesWritable writable = new BytesWritable();
-    writable.readFields(in);
-    
-    byte[] bytes = writable.getBytes();
-    Rename[] options = new Rename[bytes.length];
-    
-    for (int i = 0; i < bytes.length; i++) {
-      options[i] = Rename.valueOf(bytes[i]);
-    }
-    return options;
-  }
-
-  static private BlockInfo[] readBlocks(
-      DataInputStream in,
-      int logVersion,
-      boolean isFileUnderConstruction,
-      short replication) throws IOException {
-    int numBlocks = in.readInt();
-    BlockInfo[] blocks = new BlockInfo[numBlocks];
-    Block blk = new Block();
-    BlockTwo oldblk = new BlockTwo();
-    for (int i = 0; i < numBlocks; i++) {
-      if (logVersion <= -14) {
-        blk.readFields(in);
-      } else {
-        oldblk.readFields(in);
-        blk.set(oldblk.blkid, oldblk.len,
-                GenerationStamp.GRANDFATHER_GENERATION_STAMP);
-      }
-      if(isFileUnderConstruction && i == numBlocks-1)
-        blocks[i] = new BlockInfoUnderConstruction(blk, replication);
-      else
-        blocks[i] = new BlockInfo(blk, replication);
-    }
-    return blocks;
-  }
-  
-  /** 
    * Throw appropriate exception during upgrade from 203, when editlog loading
    * could fail due to opcode conflicts.
    */

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jun 10 00:11:32 2011
@@ -1396,62 +1396,7 @@ public class FSNamesystem implements FSC
 
     try {
       INode myFile = dir.getFileINode(src);
-      if (myFile != null && myFile.isUnderConstruction()) {
-        INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
-        //
-        // If the file is under construction , then it must be in our
-        // leases. Find the appropriate lease record.
-        //
-        Lease lease = leaseManager.getLeaseByPath(src);
-        if (lease == null) {
-          throw new AlreadyBeingCreatedException(
-              "failed to create file " + src + " for " + holder +
-              " on client " + clientMachine + 
-              " because pendingCreates is non-null but no leases found.");
-        }
-        //
-        // We found the lease for this file. And surprisingly the original
-        // holder is trying to recreate this file. This should never occur.
-        //
-        if (lease.getHolder().equals(holder)) {
-          throw new AlreadyBeingCreatedException(
-              "failed to create file " + src + " for " + holder +
-              " on client " + clientMachine + 
-              " because current leaseholder is trying to recreate file.");
-        }
-        assert lease.getHolder().equals(pendingFile.getClientName()) :
-          "Current lease holder " + lease.getHolder() +
-          " does not match file creator " + pendingFile.getClientName();
-        //
-        // Current lease holder is different from the requester.
-        // If the original holder has not renewed in the last SOFTLIMIT 
-        // period, then start lease recovery, otherwise fail.
-        //
-        if (lease.expiredSoftLimit()) {
-          LOG.info("startFile: recover lease " + lease + ", src=" + src);
-          boolean isClosed = internalReleaseLease(lease, src, null);
-          if(!isClosed)
-            throw new RecoveryInProgressException(
-                "Failed to close file " + src +
-                ". Lease recovery is in progress. Try again later.");
-
-        } else {
-          BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
-          if(lastBlock != null && lastBlock.getBlockUCState() ==
-            BlockUCState.UNDER_RECOVERY) {
-            throw new RecoveryInProgressException(
-              "Recovery in progress, file [" + src + "], " +
-              "lease owner [" + lease.getHolder() + "]");
-            } else {
-              throw new AlreadyBeingCreatedException(
-                "Failed to create file [" + src + "] for [" + holder +
-                "] on client [" + clientMachine +
-                "], because this file is already being created by [" +
-                pendingFile.getClientName() + "] on [" +
-                pendingFile.getClientMachine() + "]");
-            }
-         }
-      }
+      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
 
       try {
         blockManager.verifyReplication(src, replication, clientMachine);
@@ -1538,6 +1483,120 @@ public class FSNamesystem implements FSC
   }
 
   /**
+   * Recover lease;
+   * Immediately revoke the lease of the current lease holder and start lease
+   * recovery so that the file can be forced to be closed.
+   * 
+   * @param src the path of the file to start lease recovery
+   * @param holder the lease holder's name
+   * @param clientMachine the client machine's name
+   * @return true if the file is already closed
+   * @throws IOException
+   */
+  synchronized boolean recoverLease(String src, String holder, String clientMachine)
+  throws IOException {
+    if (isInSafeMode()) {
+      throw new SafeModeException(
+          "Cannot recover the lease of " + src, safeMode);
+    }
+    if (!DFSUtil.isValidName(src)) {
+      throw new IOException("Invalid file name: " + src);
+    }
+
+    INode inode = dir.getFileINode(src);
+    if (inode == null) {
+      throw new FileNotFoundException("File not found " + src);
+    }
+
+    if (!inode.isUnderConstruction()) {
+      return true;
+    }
+    if (isPermissionEnabled) {
+      checkPathAccess(src, FsAction.WRITE);
+    }
+
+    recoverLeaseInternal(inode, src, holder, clientMachine, true);
+    return false;
+  }
+
+  private void recoverLeaseInternal(INode fileInode, 
+      String src, String holder, String clientMachine, boolean force)
+  throws IOException {
+    if (fileInode != null && fileInode.isUnderConstruction()) {
+      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
+      //
+      // If the file is under construction , then it must be in our
+      // leases. Find the appropriate lease record.
+      //
+      Lease lease = leaseManager.getLease(holder);
+      //
+      // We found the lease for this file. And surprisingly the original
+      // holder is trying to recreate this file. This should never occur.
+      //
+      if (!force && lease != null) {
+        Lease leaseFile = leaseManager.getLeaseByPath(src);
+        if ((leaseFile != null && leaseFile.equals(lease)) ||
+            lease.getHolder().equals(holder)) { 
+          throw new AlreadyBeingCreatedException(
+            "failed to create file " + src + " for " + holder +
+            " on client " + clientMachine + 
+            " because current leaseholder is trying to recreate file.");
+        }
+      }
+      //
+      // Find the original holder.
+      //
+      lease = leaseManager.getLease(pendingFile.getClientName());
+      if (lease == null) {
+        throw new AlreadyBeingCreatedException(
+          "failed to create file " + src + " for " + holder +
+          " on client " + clientMachine + 
+          " because pendingCreates is non-null but no leases found.");
+      }
+      if (force) {
+        // close now: no need to wait for soft lease expiration and 
+        // close only the file src
+        LOG.info("recoverLease: recover lease " + lease + ", src=" + src +
+          " from client " + pendingFile.getClientName());
+        internalReleaseLease(lease, src, holder);
+      } else {
+        assert lease.getHolder().equals(pendingFile.getClientName()) :
+          "Current lease holder " + lease.getHolder() +
+          " does not match file creator " + pendingFile.getClientName();
+        //
+        // If the original holder has not renewed in the last SOFTLIMIT 
+        // period, then start lease recovery.
+        //
+        if (lease.expiredSoftLimit()) {
+          LOG.info("startFile: recover lease " + lease + ", src=" + src +
+              " from client " + pendingFile.getClientName());
+          boolean isClosed = internalReleaseLease(lease, src, null);
+          if(!isClosed)
+            throw new RecoveryInProgressException(
+                "Failed to close file " + src +
+                ". Lease recovery is in progress. Try again later.");
+        } else {
+          BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
+          if(lastBlock != null && lastBlock.getBlockUCState() ==
+            BlockUCState.UNDER_RECOVERY) {
+            throw new RecoveryInProgressException(
+              "Recovery in progress, file [" + src + "], " +
+              "lease owner [" + lease.getHolder() + "]");
+            } else {
+              throw new AlreadyBeingCreatedException(
+                "Failed to create file [" + src + "] for [" + holder +
+                "] on client [" + clientMachine +
+                "], because this file is already being created by [" +
+                pendingFile.getClientName() + "] on [" +
+                pendingFile.getClientMachine() + "]");
+            }
+         }
+      }
+    }
+
+  }
+
+  /**
    * Append to an existing file in the namespace.
    */
   LocatedBlock appendFile(String src, String holder, String clientMachine)

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jun 10 00:11:32 2011
@@ -838,6 +838,12 @@ public class NameNode implements Namenod
   }
 
   /** {@inheritDoc} */
+  public boolean recoverLease(String src, String clientName) throws IOException {
+    String clientMachine = getClientMachine();
+    return namesystem.recoverLease(src, clientName, clientMachine);
+  }
+
+  /** {@inheritDoc} */
   public boolean setReplication(String src, short replication) 
     throws IOException {  
     return namesystem.setReplication(src, replication);