You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/11/02 22:24:25 UTC

svn commit: r1405173 - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/ipc/ src/hdfs/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/

Author: suresh
Date: Fri Nov  2 21:24:24 2012
New Revision: 1405173

URL: http://svn.apache.org/viewvc?rev=1405173&view=rev
Log:
Add the missing change description for HDFS-3791.

Added:
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1405173&r1=1405172&r2=1405173&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Fri Nov  2 21:24:24 2012
@@ -307,6 +307,10 @@ Release 1.2.0 - unreleased
     MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on 
     default FS. (Gera Shegalov via tucu)
 
+    HDFS-3791. HDFS-173 Backport - Namenode will not block until a large 
+    directory deletion completes. It allows other operations when the 
+    deletion is in progress. (umamahesh via suresh)
+
 Release 1.1.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java?rev=1405173&r1=1405172&r2=1405173&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java Fri Nov  2 21:24:24 2012
@@ -109,7 +109,7 @@ public class Client {
    * @param conf Configuration
    * @return the ping interval
    */
-  final static int getPingInterval(Configuration conf) {
+  public final static int getPingInterval(Configuration conf) {
     return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
   }
   

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1405173&r1=1405172&r2=1405173&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Nov  2 21:24:24 2012
@@ -86,7 +86,7 @@ public class DFSClient implements FSCons
   volatile boolean clientRunning = true;
   Random r = new Random();
   final String clientName;
-  final LeaseChecker leasechecker = new LeaseChecker();
+  final LeaseRenewer leaserenewer;
   private Configuration conf;
   private long defaultBlockSize;
   private short defaultReplication;
@@ -250,6 +250,9 @@ public class DFSClient implements FSCons
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
     this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
+    
+    // TODO: review this
+    leaserenewer = new LeaseRenewer(this, Client.getPingInterval(conf));
 
     ugi = UserGroupInformation.getCurrentUser();
 
@@ -317,10 +320,10 @@ public class DFSClient implements FSCons
    */
   public synchronized void close() throws IOException {
     if(clientRunning) {
-      leasechecker.close();
+      leaserenewer.close();
       clientRunning = false;
       try {
-        leasechecker.interruptAndJoin();
+        leaserenewer.interruptAndJoin();
       } catch (InterruptedException ie) {
       }
   
@@ -760,7 +763,7 @@ public class DFSClient implements FSCons
     OutputStream result = new DFSOutputStream(src, masked,
         overwrite, createParent, replication, blockSize, progress, buffersize,
         conf.getInt("io.bytes.per.checksum", 512));
-    leasechecker.put(src, result);
+    leaserenewer.put(src, result);
     return result;
   }
 
@@ -815,7 +818,7 @@ public class DFSClient implements FSCons
     }
     final DFSOutputStream result = new DFSOutputStream(src, buffersize, progress,
         lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
-    leasechecker.put(src, result);
+    leaserenewer.put(src, result);
     return result;
   }
 
@@ -1392,115 +1395,8 @@ public class DFSClient implements FSCons
     throw new IOException("No live nodes contain current block");
   }
 
-  boolean isLeaseCheckerStarted() {
-    return leasechecker.daemon != null;
-  }
-
-  /** Lease management*/
-  class LeaseChecker implements Runnable {
-    /** A map from src -> DFSOutputStream of files that are currently being
-     * written by this client.
-     */
-    private final SortedMap<String, OutputStream> pendingCreates
-        = new TreeMap<String, OutputStream>();
-
-    private Daemon daemon = null;
-    
-    synchronized void put(String src, OutputStream out) {
-      if (clientRunning) {
-        if (daemon == null) {
-          daemon = new Daemon(this);
-          daemon.start();
-        }
-        pendingCreates.put(src, out);
-      }
-    }
-    
-    synchronized void remove(String src) {
-      pendingCreates.remove(src);
-    }
-    
-    void interruptAndJoin() throws InterruptedException {
-      Daemon daemonCopy = null;
-      synchronized (this) {
-        if (daemon != null) {
-          daemon.interrupt();
-          daemonCopy = daemon;
-        }
-      }
-     
-      if (daemonCopy != null) {
-        LOG.debug("Wait for lease checker to terminate");
-        daemonCopy.join();
-      }
-    }
-
-    void close() {
-      while (true) {
-        String src;
-        OutputStream out;
-        synchronized (this) {
-          if (pendingCreates.isEmpty()) {
-            return;
-          }
-          src = pendingCreates.firstKey();
-          out = pendingCreates.remove(src);
-        }
-        if (out != null) {
-          try {
-            out.close();
-          } catch (IOException ie) {
-            LOG.error("Exception closing file " + src+ " : " + ie, ie);
-          }
-        }
-      }
-    }
-
-    private void renew() throws IOException {
-      synchronized(this) {
-        if (pendingCreates.isEmpty()) {
-          return;
-        }
-      }
-      namenode.renewLease(clientName);
-    }
-
-    /**
-     * Periodically check in with the namenode and renew all the leases
-     * when the lease period is half over.
-     */
-    public void run() {
-      long lastRenewed = 0;
-      while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
-          try {
-            renew();
-            lastRenewed = System.currentTimeMillis();
-          } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName, ie);
-          }
-        }
-
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + " is interrupted.", ie);
-          }
-          return;
-        }
-      }
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-      String s = getClass().getSimpleName();
-      if (LOG.isTraceEnabled()) {
-        return s + "@" + DFSClient.this + ": "
-               + StringUtils.stringifyException(new Throwable("for testing"));
-      }
-      return s;
-    }
+  boolean isLeaseRenewerStarted() {
+    return leaserenewer.isRunning();
   }
 
   /** Utility class to encapsulate data node info and its address. */
@@ -3994,7 +3890,7 @@ public class DFSClient implements FSCons
           throw e;
       }
       closeInternal();
-      leasechecker.remove(src);
+      leaserenewer.remove(src);
       
       if (s != null) {
         s.close();
@@ -4011,7 +3907,7 @@ public class DFSClient implements FSCons
       response.close();
       closed = true;
     }
- 
+    
     // shutdown datastreamer and responseprocessor threads.
     private void closeThreads() throws IOException {
       try {

Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1405173&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java (added)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java Fri Nov  2 21:24:24 2012
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+
+public class LeaseRenewer {
+  private static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
+
+  static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+  static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
+  /** A map from src -> DFSOutputStream of files that are currently being
+   * written by this client.
+   */
+  private final SortedMap<String, OutputStream> pendingCreates
+      = new TreeMap<String, OutputStream>();
+  /** The time in milliseconds that the map became empty. */
+  private long emptyTime = Long.MAX_VALUE;
+  /** A fixed lease renewal time period in milliseconds */
+  private final long renewal;
+
+  /** A daemon for renewing lease */
+  private Daemon daemon = null;
+  /** Only the daemon with currentId should run. */
+  private int currentId = 0;
+
+  /** 
+   * A period in milliseconds that the lease renewer thread should run
+   * after the map became empty.
+   * If the map is empty for a time period longer than the grace period,
+   * the renewer should terminate.  
+   */
+  private long gracePeriod;
+  /**
+   * The time period in milliseconds
+   * that the renewer sleeps for each iteration. 
+   */
+  private volatile long sleepPeriod;
+
+  private final DFSClient dfsclient;
+
+  LeaseRenewer(final DFSClient dfsclient, final long timeout) {
+    this.dfsclient = dfsclient;
+    this.renewal = (timeout > 0 && timeout < FSConstants.LEASE_SOFTLIMIT_PERIOD)? 
+        timeout/2: FSConstants.LEASE_SOFTLIMIT_PERIOD/2;
+    setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+  }
+
+  /** Set the grace period and adjust the sleep period accordingly. */
+  void setGraceSleepPeriod(final long gracePeriod) {
+    if (gracePeriod < 100L) {
+      throw new IllegalArgumentException(gracePeriod
+          + " = gracePeriod < 100ms is too small.");
+    }
+    synchronized(this) {
+      this.gracePeriod = gracePeriod;
+    }
+    final long half = gracePeriod/2;
+    this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+        half: LEASE_RENEWER_SLEEP_DEFAULT;
+  }
+
+  /** Is the daemon running? */
+  synchronized boolean isRunning() {
+    return daemon != null && daemon.isAlive();
+  }
+
+  /** Is the empty period longer than the grace period? */  
+  private synchronized boolean isRenewerExpired() {
+    return emptyTime != Long.MAX_VALUE
+        && System.currentTimeMillis() - emptyTime > gracePeriod;
+  }
+
+  synchronized void put(String src, OutputStream out) {
+    if (dfsclient.clientRunning) {
+      if (daemon == null || isRenewerExpired()) {
+        //start a new deamon with a new id.
+        final int id = ++currentId;
+        daemon = new Daemon(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              LeaseRenewer.this.run(id);
+            } catch(InterruptedException e) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+                    + " is interrupted.", e);
+              }
+            }
+          }
+        });
+        daemon.start();
+      }
+      pendingCreates.put(src, out);
+      emptyTime = Long.MAX_VALUE;
+    }
+  }
+  
+  synchronized void remove(String src) {
+    pendingCreates.remove(src);
+    if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
+      //discover the first time that the map is empty.
+      emptyTime = System.currentTimeMillis();
+    }
+  }
+  
+  void interruptAndJoin() throws InterruptedException {
+    Daemon daemonCopy = null;
+    synchronized (this) {
+      if (isRunning()) {
+        daemon.interrupt();
+        daemonCopy = daemon;
+      }
+    }
+   
+    if (daemonCopy != null) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Wait for lease checker to terminate");
+      }
+      daemonCopy.join();
+    }
+  }
+
+  void close() {
+    while (true) {
+      String src;
+      OutputStream out;
+      synchronized (this) {
+        if (pendingCreates.isEmpty()) {
+          return;
+        }
+        src = pendingCreates.firstKey();
+        out = pendingCreates.remove(src);
+      }
+      if (out != null) {
+        try {
+          out.close();
+        } catch (IOException ie) {
+          LOG.error("Exception closing file " + src+ " : " + ie, ie);
+        }
+      }
+    }
+  }
+
+  /**
+   * Abort all open files. Release resources held. Ignore all errors.
+   */
+  synchronized void abort() {
+    dfsclient.clientRunning = false;
+    while (!pendingCreates.isEmpty()) {
+      String src = pendingCreates.firstKey();
+      DFSClient.DFSOutputStream out = (DFSClient.DFSOutputStream) pendingCreates
+          .remove(src);
+      if (out != null) {
+          // TODO:
+//        try {
+//          out.abort();
+//          
+//        } catch (IOException ie) {
+//          LOG.error("Exception aborting file " + src+ ": ", ie);
+//        }
+      }
+    }
+    RPC.stopProxy(dfsclient.namenode); // close connections to the namenode
+  }
+
+  private void renew() throws IOException {
+    synchronized(this) {
+      if (pendingCreates.isEmpty()) {
+        return;
+      }
+    }
+    dfsclient.namenode.renewLease(dfsclient.clientName);
+  }
+
+  /**
+   * Periodically check in with the namenode and renew all the leases
+   * when the lease period is half over.
+   */
+  private void run(final int id) throws InterruptedException {
+    for(long lastRenewed = System.currentTimeMillis();
+        dfsclient.clientRunning && !Thread.interrupted();
+        Thread.sleep(sleepPeriod)) {
+      if (System.currentTimeMillis() - lastRenewed >= renewal) {
+        try {
+          renew();
+          lastRenewed = System.currentTimeMillis();
+        } catch (SocketTimeoutException ie) {
+          LOG.warn("Failed to renew lease for " + dfsclient.clientName + " for "
+              + (renewal/1000) + " seconds.  Aborting ...", ie);
+          abort();
+          break;
+        } catch (IOException ie) {
+          LOG.warn("Failed to renew lease for " + dfsclient.clientName + " for "
+              + (renewal/1000) + " seconds.  Will retry shortly ...", ie);
+        }
+      }
+
+      synchronized(this) {
+        if (id != currentId || isRenewerExpired()) {
+          //no longer the current daemon or expired
+          return;
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  public String toString() {
+    String s = getClass().getSimpleName();
+    if (LOG.isTraceEnabled()) {
+      return s + "@" + dfsclient + ": "
+             + StringUtils.stringifyException(new Throwable("for testing"));
+    }
+    return s;
+  }
+}

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1405173&r1=1405172&r2=1405173&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java Fri Nov  2 21:24:24 2012
@@ -165,7 +165,7 @@ public class AppendTestUtil {
     LOG.info("leasechecker.interruptAndJoin()");
     // lose the lease on the client
     DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
-    dfs.dfs.leasechecker.interruptAndJoin();
+    dfs.dfs.leaserenewer.interruptAndJoin();
   }
   
   public static void recoverFile(MiniDFSCluster cluster, FileSystem fs,

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1405173&r1=1405172&r2=1405173&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Nov  2 21:24:24 2012
@@ -111,31 +111,31 @@ public class TestDistributedFileSystem {
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.isLeaseRenewerStarted());
   
         //create a file
         FSDataOutputStream out = dfs.create(filepath);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        assertTrue(dfs.dfs.isLeaseRenewerStarted());
   
         //write something and close
         out.writeLong(millis);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        assertTrue(dfs.dfs.isLeaseRenewerStarted());
         out.close();
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        assertTrue(dfs.dfs.isLeaseRenewerStarted());
         dfs.close();
       }
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.isLeaseRenewerStarted());
 
         //open and check the file
         FSDataInputStream in = dfs.open(filepath);
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.isLeaseRenewerStarted());
         assertEquals(millis, in.readLong());
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.isLeaseRenewerStarted());
         in.close();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.isLeaseRenewerStarted());
         dfs.close();
       }
     }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1405173&r1=1405172&r2=1405173&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Fri Nov  2 21:24:24 2012
@@ -656,7 +656,7 @@ public class TestFileAppend4 extends Tes
       // has not been completed in the NN.
       // Lose the leases
       LOG.info("Killing lease checker");
-      client.leasechecker.interruptAndJoin();
+      client.leaserenewer.interruptAndJoin();
 
       FileSystem fs1 = cluster.getFileSystem();
       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
@@ -726,7 +726,7 @@ public class TestFileAppend4 extends Tes
       // has not been completed in the NN.
       // Lose the leases
       LOG.info("Killing lease checker");
-      client.leasechecker.interruptAndJoin();
+      client.leaserenewer.interruptAndJoin();
 
       FileSystem fs1 = cluster.getFileSystem();
       FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1405173&r1=1405172&r2=1405173&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Fri Nov  2 21:24:24 2012
@@ -157,7 +157,7 @@ public class TestLeaseRecovery2 extends 
     stm.sync();
     if (triggerSoftLease) {
       AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
-      dfs.dfs.leasechecker.interruptAndJoin();
+      dfs.dfs.leaserenewer.interruptAndJoin();
     }
     return filepath;
   }