You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/05/07 03:03:42 UTC
svn commit: r653961 -
/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
Author: rangadi
Date: Tue May 6 18:03:42 2008
New Revision: 653961
URL: http://svn.apache.org/viewvc?rev=653961&view=rev
Log:
HADOOP-3334 - forgot add the new file LeaseManager.java
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java?rev=653961&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/LeaseManager.java Tue May 6 18:03:42 2008
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+
+class LeaseManager {
+ static final Log LOG = FSNamesystem.LOG;
+
+ private final FSNamesystem fsnamesystem;
+
+ private long softLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD;
+ private long hardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD;
+
+ //
+ // Used for handling lock-leases
+ // Mapping: leaseHolder -> Lease
+ //
+ private SortedMap<StringBytesWritable, Lease> leases = new TreeMap<StringBytesWritable, Lease>();
+ // Set of: Lease
+ private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
+
+ //
+ // Map path names to leases. It is protected by the sortedLeases lock.
+ // The map stores pathnames in lexicographical order.
+ //
+ private SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
+
+ LeaseManager(FSNamesystem fsnamesystem) {this.fsnamesystem = fsnamesystem;}
+
+ Lease getLease(String holder) throws IOException {
+ return leases.get(new StringBytesWritable(holder));
+ }
+
+ SortedSet<Lease> getSortedLeases() {return sortedLeases;}
+
+ /** @return the number of leases currently in the system */
+ synchronized int countLease() {return sortedLeases.size();}
+
+ /** @return the number of paths contained in all leases */
+ synchronized int countPath() {
+ int count = 0;
+ for(Lease lease : sortedLeases) {
+ count += lease.getPaths().size();
+ }
+ return count;
+ }
+
+ /**
+ * Adds (or re-adds) the lease for the specified file.
+ */
+ synchronized void addLease(String src, String holder) throws IOException {
+ Lease lease = getLease(holder);
+ if (lease == null) {
+ lease = new Lease(holder);
+ leases.put(new StringBytesWritable(holder), lease);
+ sortedLeases.add(lease);
+ } else {
+ sortedLeases.remove(lease);
+ lease.renew();
+ sortedLeases.add(lease);
+ }
+ sortedLeasesByPath.put(src, lease);
+ lease.startedCreate(src);
+ }
+
+ /**
+ * deletes the lease for the specified file
+ */
+ synchronized void removeLease(String src, String holder) throws IOException {
+ Lease lease = getLease(holder);
+ if (lease != null) {
+ lease.completedCreate(src);
+ if (!lease.hasPath()) {
+ leases.remove(new StringBytesWritable(holder));
+ sortedLeases.remove(lease);
+ sortedLeasesByPath.remove(src);
+ }
+ }
+ }
+
+ /**
+ * Renew the lease(s) held by the given client
+ */
+ synchronized void renewLease(String holder) throws IOException {
+ Lease lease = getLease(holder);
+ if (lease != null) {
+ sortedLeases.remove(lease);
+ lease.renew();
+ sortedLeases.add(lease);
+ }
+ }
+
+ synchronized void handleExpiredSoftLimit(Lease lease) throws IOException {
+ lease.releaseLocks();
+ leases.remove(lease.holder);
+ LOG.info("startFile: Removing lease " + lease);
+ if (!sortedLeases.remove(lease)) {
+ LOG.error("startFile: Unknown failure trying to remove " + lease +
+ " from lease set.");
+ }
+ }
+
+ synchronized void abandonLease(String src, String holder) throws IOException {
+ // find the lease
+ Lease lease = getLease(holder);
+ if (lease != null) {
+ // remove the file from the lease
+ if (lease.completedCreate(src)) {
+ // if we found the file in the lease, remove it from pendingCreates
+ fsnamesystem.internalReleaseCreate(src, holder);
+ } else {
+ LOG.warn("Attempt by " + holder +
+ " to release someone else's create lock on " + src);
+ }
+ } else {
+ LOG.warn("Attempt to release a lock from an unknown lease holder "
+ + holder + " for " + src);
+ }
+ }
+
+ /************************************************************
+ * A Lease governs all the locks held by a single client.
+ * For each client there's a corresponding lease, whose
+ * timestamp is updated when the client periodically
+ * checks in. If the client dies and allows its lease to
+ * expire, all the corresponding locks can be released.
+ *************************************************************/
+ class Lease implements Comparable<Lease> {
+ private StringBytesWritable holder;
+ private long lastUpdate;
+ private Collection<StringBytesWritable> paths = new TreeSet<StringBytesWritable>();
+
+ public Lease(String holder) throws IOException {
+ this.holder = new StringBytesWritable(holder);
+ renew();
+ }
+ public void renew() {
+ this.lastUpdate = FSNamesystem.now();
+ }
+
+ /** @return true if the Hard Limit Timer has expired */
+ public boolean expiredHardLimit() {
+ return FSNamesystem.now() - lastUpdate > hardLimit;
+ }
+
+ /** @return true if the Soft Limit Timer has expired */
+ public boolean expiredSoftLimit() {
+ return FSNamesystem.now() - lastUpdate > softLimit;
+ }
+
+ void startedCreate(String src) throws IOException {
+ paths.add(new StringBytesWritable(src));
+ }
+
+ boolean completedCreate(String src) throws IOException {
+ return paths.remove(new StringBytesWritable(src));
+ }
+
+ boolean hasPath() {return !paths.isEmpty();}
+
+ void releaseLocks() throws IOException {
+ String holderStr = holder.getString();
+ for(StringBytesWritable s : paths) {
+ fsnamesystem.internalReleaseCreate(s.getString(), holderStr);
+ }
+ paths.clear();
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return "[Lease. Holder: " + holder
+ + ", pendingcreates: " + paths.size() + "]";
+ }
+
+ /** {@inheritDoc} */
+ public int compareTo(Lease o) {
+ Lease l1 = this;
+ Lease l2 = o;
+ long lu1 = l1.lastUpdate;
+ long lu2 = l2.lastUpdate;
+ if (lu1 < lu2) {
+ return -1;
+ } else if (lu1 > lu2) {
+ return 1;
+ } else {
+ return l1.holder.compareTo(l2.holder);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public boolean equals(Object o) {
+ if (!(o instanceof Lease)) {
+ return false;
+ }
+ Lease obj = (Lease) o;
+ if (lastUpdate == obj.lastUpdate &&
+ holder.equals(obj.holder)) {
+ return true;
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ public int hashCode() {
+ return holder.hashCode();
+ }
+
+ Collection<StringBytesWritable> getPaths() {
+ return paths;
+ }
+
+ // If a file with the specified prefix exists, then replace
+ // it with the new prefix.
+ //
+ void replacePrefix(String src, String overwrite,
+ String replaceBy) throws IOException {
+ List<StringBytesWritable> toAdd = new ArrayList<StringBytesWritable>();
+ for (Iterator<StringBytesWritable> f = paths.iterator();
+ f.hasNext();){
+ String path = f.next().getString();
+ if (!path.startsWith(src)) {
+ continue;
+ }
+ // remove this filename from this lease.
+ f.remove();
+
+ // remember new filename
+ String newPath = path.replaceFirst(overwrite, replaceBy);
+ toAdd.add(new StringBytesWritable(newPath));
+ LOG.info("Modified Lease for file " + path +
+ " to new path " + newPath);
+ }
+ // add modified filenames back into lease.
+ for (Iterator<StringBytesWritable> f = toAdd.iterator();
+ f.hasNext();) {
+ paths.add(f.next());
+ }
+ }
+ }
+
+ synchronized void changeLease(String src, String dst,
+ String overwrite, String replaceBy) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getClass().getName() + ".changelease: " +
+ " src=" + src + ", dest=" + dst +
+ ", overwrite=" + overwrite +
+ ", replaceBy=" + replaceBy);
+ }
+
+ Map<String, Lease> addTo = new TreeMap<String, Lease>();
+ SortedMap<String, Lease> myset = sortedLeasesByPath.tailMap(src);
+ for (Iterator<Map.Entry<String, Lease>> iter = myset.entrySet().iterator();
+ iter.hasNext();) {
+ Map.Entry<String, Lease> value = iter.next();
+ String path = (String)value.getKey();
+ if (!path.startsWith(src)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("changelease comparing " + path +
+ " with " + src + " and terminating.");
+ }
+ break;
+ }
+ Lease lease = (Lease)value.getValue();
+
+ // Fix up all the pathnames in this lease.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("changelease comparing " + path +
+ " with " + src + " and replacing ");
+ }
+ lease.replacePrefix(src, overwrite, replaceBy);
+
+ // Remove this lease from sortedLeasesByPath because the
+ // pathname has changed.
+ String newPath = path.replaceFirst(overwrite, replaceBy);
+ addTo.put(newPath, lease);
+ iter.remove();
+ }
+ // re-add entries back in sortedLeasesByPath
+ sortedLeasesByPath.putAll(addTo);
+ }
+
+ void setLeasePeriod(long softLimit, long hardLimit) {
+ this.softLimit = softLimit;
+ this.hardLimit = hardLimit;
+ }
+
+ Monitor createMonitor() {return new Monitor();}
+
+ /******************************************************
+ * Monitor checks for leases that have expired,
+ * and disposes of them.
+ ******************************************************/
+ class Monitor implements Runnable {
+ public void run() {
+ try {
+ while (fsnamesystem.fsRunning) {
+ synchronized (fsnamesystem) {
+ synchronized (sortedLeases) {
+ Lease top;
+ while ((sortedLeases.size() > 0) &&
+ ((top = sortedLeases.first()) != null)) {
+ if (top.expiredHardLimit()) {
+ top.releaseLocks();
+ leases.remove(top.holder);
+ LOG.info("Removing lease " + top
+ + ", leases remaining: " + sortedLeases.size());
+ if (!sortedLeases.remove(top)) {
+ LOG.warn("Unknown failure trying to remove " + top
+ + " from lease set.");
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ try {
+ Thread.sleep(2000);
+ } catch(InterruptedException ie) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getClass().getName() + " is interrupted", ie);
+ }
+ }
+ }
+ } catch(Exception e) {
+ LOG.error("In " + getClass().getName(), e);
+ }
+ }
+ }
+}