You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by dh...@apache.org on 2009/07/13 08:37:18 UTC

svn commit: r793469 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java

Author: dhruba
Date: Mon Jul 13 06:37:18 2009
New Revision: 793469

URL: http://svn.apache.org/viewvc?rev=793469&view=rev
Log:
HDFS-278. HDFS Outputstream close does not hang forever. (dhruba)


Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=793469&r1=793468&r2=793469&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Jul 13 06:37:18 2009
@@ -32,6 +32,8 @@
     HDFS-204. Add a new metrics FilesInGetListingOps to the Namenode.
     (Jitendra Nath Pandey via szetszwo)
 
+    HDFS-278. HDFS Outputstream close does not hang forever. (dhruba)
+
   BUG FIXES
     HDFS-76. Better error message to users when commands fail because of 
     lack of quota. Allow quota to be set even if the limit is lower than

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=793469&r1=793468&r2=793469&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Mon Jul 13 06:37:18 2009
@@ -85,8 +85,12 @@
   final int writePacketSize;
   private final FileSystem.Statistics stats;
   private int maxBlockAcquireFailures;
-    
- 
+  private final int hdfsTimeout;    // timeout value for a DFS operation.
+
+  /**
+   * The locking hierarchy is to first acquire lock on DFSClient object, followed by 
+   * lock on leasechecker, followed by lock on an individual DFSOutputStream.
+   */
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
   }
@@ -169,7 +173,9 @@
     this.maxBlockAcquireFailures = 
                           conf.getInt("dfs.client.max.block.acquire.failures",
                                       MAX_BLOCK_ACQUIRE_FAILURES);
-    
+    // The hdfsTimeout is currently the same as the ipc timeout 
+    this.hdfsTimeout = Client.getTimeout(conf);
+
     try {
       this.ugi = UnixUserGroupInformation.login(conf, true);
     } catch (LoginException e) {
@@ -989,6 +995,25 @@
       }
     }
 
+    /**
+     * Abort all open files. Release resources held. Ignore all errors.
+     */
+    synchronized void abort() {
+      clientRunning = false;
+      while (!pendingCreates.isEmpty()) {
+        String src = pendingCreates.firstKey();
+        DFSOutputStream out = (DFSOutputStream)pendingCreates.remove(src);
+        if (out != null) {
+          try {
+            out.abort();
+          } catch (IOException ie) {
+            LOG.error("Exception aborting file " + src+ ": ", ie);
+          }
+        }
+      }
+      RPC.stopProxy(rpcNamenode); // close connections to the namenode
+    }
+
     private void renew() throws IOException {
       synchronized(this) {
         if (pendingCreates.isEmpty()) {
@@ -1004,13 +1029,25 @@
      */
     public void run() {
       long lastRenewed = 0;
+      int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
+      if (hdfsTimeout > 0) {
+        renewal = Math.min(renewal, hdfsTimeout/2);
+      }
       while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
+        if (System.currentTimeMillis() - lastRenewed > renewal) {
           try {
             renew();
             lastRenewed = System.currentTimeMillis();
+          } catch (SocketTimeoutException ie) {
+            LOG.warn("Problem renewing lease for " + clientName +
+                     " for a period of " + (hdfsTimeout/1000) +
+                     " seconds. Shutting down HDFS client...", ie);
+            abort();
+            break;
           } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName, ie);
+            LOG.warn("Problem renewing lease for " + clientName +
+                     " for a period of " + (hdfsTimeout/1000) +
+                     " seconds. Will retry shortly...", ie);
           }
         }
 
@@ -3141,6 +3178,19 @@
       }
     }
 
+    /**
+     * Aborts this output stream and releases any system 
+     * resources associated with this stream.
+     */
+    synchronized void abort() throws IOException {
+      if (closed) {
+        return;
+      }
+      streamer.setLastException(new IOException("Lease timeout of " +
+                               (hdfsTimeout/1000) + " seconds expired."));
+      closeThreads();
+    }
+ 
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {
       try {
@@ -3204,6 +3254,16 @@
       while (!fileComplete) {
         fileComplete = namenode.complete(src, clientName);
         if (!fileComplete) {
+          if (!clientRunning ||
+                (hdfsTimeout > 0 &&
+                 localstart + hdfsTimeout < System.currentTimeMillis())) {
+              String msg = "Unable to close file because dfsclient " +
+                            " was unable to contact the HDFS servers." +
+                            " clientRunning " + clientRunning +
+                            " hdfsTimeout " + hdfsTimeout;
+              LOG.info(msg);
+              throw new IOException(msg);
+          }
           try {
             Thread.sleep(400);
             if (System.currentTimeMillis() - localstart > 5000) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=793469&r1=793468&r2=793469&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Mon Jul 13 06:37:18 2009
@@ -842,6 +842,53 @@
       dfs.close();
     } finally {
       System.out.println("testFsClose successful");
+      cluster.shutdown();
+    }
+  }
+
+  // test closing file after cluster is shutdown
+  public void testFsCloseAfterClusterShutdown() throws IOException {
+    System.out.println("test testFsCloseAfterClusterShutdown start");
+    final int DATANODE_NUM = 3;
+
+    Configuration conf = new Configuration();
+    conf.setInt("dfs.replication.min", 3);
+    conf.setBoolean("ipc.client.ping", false); // hdfs timeout is default 60 seconds
+    conf.setInt("ipc.ping.interval", 10000); // hdfs timeout is now 10 second
+
+    // create cluster
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+    DistributedFileSystem dfs = null;
+    try {
+      cluster.waitActive();
+      dfs = (DistributedFileSystem)cluster.getFileSystem();
+
+      // create a new file.
+      final String f = DIR + "dhrubashutdown";
+      final Path fpath = new Path(f);
+      FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
+      out.write("something_dhruba".getBytes());
+      out.sync();    // ensure that block is allocated
+
+      // shutdown last datanode in pipeline.
+      cluster.stopDataNode(2);
+
+      // close file. Since we have set the minReplcatio to 3 but have killed one
+      // of the three datanodes, the close call will loop until the hdfsTimeout is
+      // encountered.
+      boolean hasException = false;
+      try {
+        out.close();
+        System.out.println("testFsCloseAfterClusterShutdown: Error here");
+      } catch (IOException e) {
+        hasException = true;
+      }
+      assertTrue("Failed to close file after cluster shutdown", hasException);
+    } finally {
+      System.out.println("testFsCloseAfterClusterShutdown successful");
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 }