You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/05/07 03:03:42 UTC

svn commit: r653961 - /hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java

Author: rangadi
Date: Tue May  6 18:03:42 2008
New Revision: 653961

URL: http://svn.apache.org/viewvc?rev=653961&view=rev
Log:
HADOOP-3334 - forgot add the new file LeaseManager.java

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java

Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java?rev=653961&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java Tue May  6 18:03:42 2008
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+
+class LeaseManager {
+  static final Log LOG = FSNamesystem.LOG;
+
+  private final FSNamesystem fsnamesystem;
+
+  private long softLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD;
+  private long hardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD;
+
+  //
+  // Used for handling lock-leases
+  // Mapping: leaseHolder -> Lease
+  //
+  private SortedMap<StringBytesWritable, Lease> leases = new TreeMap<StringBytesWritable, Lease>();
+  // Set of: Lease
+  private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
+
+  // 
+  // Map path names to leases. It is protected by the sortedLeases lock.
+  // The map stores pathnames in lexicographical order.
+  //
+  private SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
+
+  LeaseManager(FSNamesystem fsnamesystem) {this.fsnamesystem = fsnamesystem;}
+
+  Lease getLease(String holder) throws IOException {
+    return leases.get(new StringBytesWritable(holder));
+  }
+  
+  SortedSet<Lease> getSortedLeases() {return sortedLeases;}
+
+  /** @return the number of leases currently in the system */
+  synchronized int countLease() {return sortedLeases.size();}
+
+  /** @return the number of paths contained in all leases */
+  synchronized int countPath() {
+    int count = 0;
+    for(Lease lease : sortedLeases) {
+      count += lease.getPaths().size();
+    }
+    return count;
+  }
+  
+  /**
+   * Adds (or re-adds) the lease for the specified file.
+   */
+  synchronized void addLease(String src, String holder) throws IOException {
+    Lease lease = getLease(holder);
+    if (lease == null) {
+      lease = new Lease(holder);
+      leases.put(new StringBytesWritable(holder), lease);
+      sortedLeases.add(lease);
+    } else {
+      sortedLeases.remove(lease);
+      lease.renew();
+      sortedLeases.add(lease);
+    }
+    sortedLeasesByPath.put(src, lease);
+    lease.startedCreate(src);
+  }
+
+  /**
+   * deletes the lease for the specified file
+   */
+  synchronized void removeLease(String src, String holder) throws IOException {
+    Lease lease = getLease(holder);
+    if (lease != null) {
+      lease.completedCreate(src);
+      if (!lease.hasPath()) {
+        leases.remove(new StringBytesWritable(holder));
+        sortedLeases.remove(lease);
+        sortedLeasesByPath.remove(src);
+      }
+    }
+  }
+
+  /**
+   * Renew the lease(s) held by the given client
+   */
+  synchronized void renewLease(String holder) throws IOException {
+    Lease lease = getLease(holder);
+    if (lease != null) {
+      sortedLeases.remove(lease);
+      lease.renew();
+      sortedLeases.add(lease);
+    }
+  }
+
+  synchronized void handleExpiredSoftLimit(Lease lease) throws IOException {
+    lease.releaseLocks();
+    leases.remove(lease.holder);
+    LOG.info("startFile: Removing lease " + lease);
+    if (!sortedLeases.remove(lease)) {
+      LOG.error("startFile: Unknown failure trying to remove " + lease + 
+                " from lease set.");
+    }
+  }
+
+  synchronized void abandonLease(String src, String holder) throws IOException {
+    // find the lease
+    Lease lease = getLease(holder);
+    if (lease != null) {
+      // remove the file from the lease
+      if (lease.completedCreate(src)) {
+        // if we found the file in the lease, remove it from pendingCreates
+        fsnamesystem.internalReleaseCreate(src, holder);
+      } else {
+        LOG.warn("Attempt by " + holder + 
+                 " to release someone else's create lock on " + src);
+      }
+    } else {
+      LOG.warn("Attempt to release a lock from an unknown lease holder "
+               + holder + " for " + src);
+    }
+  }
+
+  /************************************************************
+   * A Lease governs all the locks held by a single client.
+   * For each client there's a corresponding lease, whose
+   * timestamp is updated when the client periodically
+   * checks in.  If the client dies and allows its lease to
+   * expire, all the corresponding locks can be released.
+   *************************************************************/
+  class Lease implements Comparable<Lease> {
+    private StringBytesWritable holder;
+    private long lastUpdate;
+    private Collection<StringBytesWritable> paths = new TreeSet<StringBytesWritable>();
+  
+    public Lease(String holder) throws IOException {
+      this.holder = new StringBytesWritable(holder);
+      renew();
+    }
+    public void renew() {
+      this.lastUpdate = FSNamesystem.now();
+    }
+
+    /** @return true if the Hard Limit Timer has expired */
+    public boolean expiredHardLimit() {
+      return FSNamesystem.now() - lastUpdate > hardLimit;
+    }
+
+    /** @return true if the Soft Limit Timer has expired */
+    public boolean expiredSoftLimit() {
+      return FSNamesystem.now() - lastUpdate > softLimit;
+    }
+
+    void startedCreate(String src) throws IOException {
+      paths.add(new StringBytesWritable(src));
+    }
+
+    boolean completedCreate(String src) throws IOException {
+      return paths.remove(new StringBytesWritable(src));
+    }
+
+    boolean hasPath() {return !paths.isEmpty();}
+
+    void releaseLocks() throws IOException {
+      String holderStr = holder.getString();
+      for(StringBytesWritable s : paths) {
+        fsnamesystem.internalReleaseCreate(s.getString(), holderStr);
+      }
+      paths.clear();
+    }
+  
+    /** {@inheritDoc} */
+    public String toString() {
+      return "[Lease.  Holder: " + holder
+          + ", pendingcreates: " + paths.size() + "]";
+    }
+  
+    /** {@inheritDoc} */
+    public int compareTo(Lease o) {
+      Lease l1 = this;
+      Lease l2 = o;
+      long lu1 = l1.lastUpdate;
+      long lu2 = l2.lastUpdate;
+      if (lu1 < lu2) {
+        return -1;
+      } else if (lu1 > lu2) {
+        return 1;
+      } else {
+        return l1.holder.compareTo(l2.holder);
+      }
+    }
+  
+    /** {@inheritDoc} */
+    public boolean equals(Object o) {
+      if (!(o instanceof Lease)) {
+        return false;
+      }
+      Lease obj = (Lease) o;
+      if (lastUpdate == obj.lastUpdate &&
+          holder.equals(obj.holder)) {
+        return true;
+      }
+      return false;
+    }
+  
+    /** {@inheritDoc} */
+    public int hashCode() {
+      return holder.hashCode();
+    }
+    
+    Collection<StringBytesWritable> getPaths() {
+      return paths;
+    }
+  
+    // If a file with the specified prefix exists, then replace 
+    // it with the new prefix.
+    //
+    void replacePrefix(String src, String overwrite, 
+                       String replaceBy) throws IOException {
+      List<StringBytesWritable> toAdd = new ArrayList<StringBytesWritable>();
+      for (Iterator<StringBytesWritable> f = paths.iterator(); 
+           f.hasNext();){
+        String path = f.next().getString();
+        if (!path.startsWith(src)) {
+          continue;
+        }
+        // remove this filename from this lease.
+        f.remove();
+  
+        // remember new filename
+        String newPath = path.replaceFirst(overwrite, replaceBy);
+        toAdd.add(new StringBytesWritable(newPath));
+        LOG.info("Modified Lease for file " + path +
+                 " to new path " + newPath);
+      }
+      // add modified filenames back into lease.
+      for (Iterator<StringBytesWritable> f = toAdd.iterator(); 
+           f.hasNext();) {
+        paths.add(f.next());
+      }
+    }
+  }
+
+  synchronized void changeLease(String src, String dst,
+      String overwrite, String replaceBy) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(getClass().getName() + ".changelease: " +
+               " src=" + src + ", dest=" + dst + 
+               ", overwrite=" + overwrite +
+               ", replaceBy=" + replaceBy);
+    }
+
+    Map<String, Lease> addTo = new TreeMap<String, Lease>();
+    SortedMap<String, Lease> myset = sortedLeasesByPath.tailMap(src);
+    for (Iterator<Map.Entry<String, Lease>> iter = myset.entrySet().iterator(); 
+         iter.hasNext();) {
+      Map.Entry<String, Lease> value = iter.next();
+      String path = (String)value.getKey();
+      if (!path.startsWith(src)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("changelease comparing " + path +
+                   " with " + src + " and terminating.");
+        }
+        break;
+      }
+      Lease lease = (Lease)value.getValue();
+
+      // Fix up all the pathnames in this lease.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("changelease comparing " + path +
+                  " with " + src + " and replacing ");
+      }
+      lease.replacePrefix(src, overwrite, replaceBy);
+
+      // Remove this lease from sortedLeasesByPath because the 
+      // pathname has changed.
+      String newPath = path.replaceFirst(overwrite, replaceBy);
+      addTo.put(newPath, lease);
+      iter.remove();
+    }
+    // re-add entries back in sortedLeasesByPath
+    sortedLeasesByPath.putAll(addTo);
+  }
+
+  void setLeasePeriod(long softLimit, long hardLimit) {
+    this.softLimit = softLimit;
+    this.hardLimit = hardLimit; 
+  }
+  
+  Monitor createMonitor() {return new Monitor();}
+
+  /******************************************************
+   * Monitor checks for leases that have expired,
+   * and disposes of them.
+   ******************************************************/
+  class Monitor implements Runnable {
+    public void run() {
+      try {
+        while (fsnamesystem.fsRunning) {
+          synchronized (fsnamesystem) {
+            synchronized (sortedLeases) {
+              Lease top;
+              while ((sortedLeases.size() > 0) &&
+                     ((top = sortedLeases.first()) != null)) {
+                if (top.expiredHardLimit()) {
+                  top.releaseLocks();
+                  leases.remove(top.holder);
+                  LOG.info("Removing lease " + top
+                      + ", leases remaining: " + sortedLeases.size());
+                  if (!sortedLeases.remove(top)) {
+                    LOG.warn("Unknown failure trying to remove " + top
+                        + " from lease set.");
+                  }
+                } else {
+                  break;
+                }
+              }
+            }
+          }
+          try {
+            Thread.sleep(2000);
+          } catch(InterruptedException ie) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getClass().getName() + " is interrupted", ie);
+            }
+          }
+        }
+      } catch(Exception e) {
+        LOG.error("In " + getClass().getName(), e);
+      }
+    }
+  }
+}