You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/05/09 09:03:19 UTC
svn commit: r1593493 [11/24] - in /river/jtsk/skunk/qa_refactor/trunk: qa/
qa/src/com/sun/jini/test/impl/end2end/jssewrapper/
qa/src/com/sun/jini/test/impl/joinmanager/
qa/src/com/sun/jini/test/impl/mahalo/
qa/src/com/sun/jini/test/impl/outrigger/match...
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LRMEventListener.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LRMEventListener.java?rev=1593493&r1=1593492&r2=1593493&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LRMEventListener.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LRMEventListener.java Fri May 9 07:03:18 2014
@@ -1,168 +1,160 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.sun.jini.norm;
-
-import com.sun.jini.thread.InterruptedStatusThread;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import net.jini.core.lease.Lease;
-import net.jini.core.lease.LeaseDeniedException;
-import net.jini.lease.DesiredExpirationListener;
-import net.jini.lease.LeaseRenewalEvent;
-
-/**
- * Object that transfers events from the Lease Renewal Manager to the
- * rest of Norm Server. Isolates the renewal manager from having to
- * block on the snapshot locks.
- *
- * @author Sun Microsystems, Inc.
- */
-class LRMEventListener extends InterruptedStatusThread
- implements DesiredExpirationListener
-{
- /** Logger for logging messages for this class */
- private static final Logger logger = Logger.getLogger("com.sun.jini.norm");
-
- /** Ref to the main server object which has all the top level methods */
- volatile private NormServerBaseImpl server;
-
- /**
- * Queue we use to decouple the reception of events from the lease
- * renewal manager from the scheduling of the sending of remote
- * events and modifying our internal tables (which both require
- * obtaining serious locks).
- */
- final private Queue queue = new Queue();
-
- /** Any events that hold this object are ignored */
- final static LeaseDeniedException EXPIRED_SET_EXCEPTION =
- new LeaseDeniedException("Set Expired");
-
- /**
- * Simple constructor
- *
- * @param server Object that will make the actual internal updates and
- * schedule the sending of remote events
- */
- LRMEventListener(NormServerBaseImpl server) {
- super("LRM Event Listener");
- setDaemon(true);
- this.server = server;
- }
-
- LRMEventListener() {
- super("LRM Event Listener");
- setDaemon(true);
- }
-
- /**
- * Set only once after construction.
- * @param server
- */
- void setServer(NormServerBaseImpl server){
- synchronized (this){
- if (this.server == null) this.server = server;
- }
- }
-
- //////////////////////////////////////////////////
- // Methods required by the LeaseListener interface
-
- // Inherit java doc from super type
- public void notify(LeaseRenewalEvent e) {
- // Drop if the exception field is == to EXPIRED_SET_EXCEPTION, this
- // implies that lease could not be renewed because the wrapper
- // has determined that the set has expired.
- if (e.getException() == EXPIRED_SET_EXCEPTION)
- return;
-
- // Paranoia, check to make sure that lease is one of wrapped
- // client lease...if it's not, ignore the event
- final Lease l = e.getLease();
- if (l instanceof ClientLeaseWrapper) {
- final ClientLeaseWrapper clw = (ClientLeaseWrapper) l;
- queue.enqueue(new Discriminator(clw, true));
- }
- }
-
- //////////////////////////////////////////////////////////////
- // Methods required by the DesiredExpirationListener interface
-
- // Inherit java doc from super type
- public void expirationReached(LeaseRenewalEvent e) {
- // Paranoia, check to make sure that lease is one of wrapped
- // client lease...if it's not, ignore the event
- final Lease l = e.getLease();
- if (l instanceof ClientLeaseWrapper) {
- final ClientLeaseWrapper clw = (ClientLeaseWrapper) l;
- queue.enqueue(new Discriminator(clw, false)) ;
- }
- }
-
- public void run() {
- // Loop taking items off the queue and pass them to the server
- while (!hasBeenInterrupted()) {
- try {
- final Discriminator d = (Discriminator) queue.dequeue();
-
- if (d.isFailure) {
- server.renewalFailure(d.clw);
- } else {
- server.desiredExpirationReached(d.clw);
- }
-
- } catch (InterruptedException e) {
- // Someone wants this thread dead -- just return
- return;
- } catch (RuntimeException e) {
- logger.log(Level.INFO,
- "Exception in LRMEventListener Notifier while " +
- "processing an event from the LRM -- " +
- "attempting to continue",
- e);
-
- } catch (Error e) {
- logger.log(Level.INFO,
- "Exception in LRMEventListener Notifier while " +
- "processing an event from the LRM -- " +
- "attempting to continue",
- e);
- }
- }
- }
-
- /**
- * Trivial container class to tell us if we are processing the given
- * wrapper because of a failure event or a desired expiration
- * reached event.
- */
- static private class Discriminator {
- /** true if this wrapper is associated with a renewal failure event */
- final private boolean isFailure;
-
- /** The wrapped leases associated with the event */
- final private ClientLeaseWrapper clw;
-
- private Discriminator(ClientLeaseWrapper clw, boolean isFailure) {
- this.isFailure = isFailure;
- this.clw = clw;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.norm;
+
+import com.sun.jini.thread.InterruptedStatusThread;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.jini.core.lease.Lease;
+import net.jini.core.lease.LeaseDeniedException;
+import net.jini.lease.DesiredExpirationListener;
+import net.jini.lease.LeaseRenewalEvent;
+
+/**
+ * Object that transfers events from the Lease Renewal Manager to the
+ * rest of Norm Server. Isolates the renewal manager from having to
+ * block on the snapshot locks.
+ *
+ * @author Sun Microsystems, Inc.
+ */
+class LRMEventListener extends InterruptedStatusThread
+ implements DesiredExpirationListener
+{
+ /** Logger for logging messages for this class */
+ private static final Logger logger = Logger.getLogger("com.sun.jini.norm");
+
+ /** Ref to the main server object which has all the top level methods */
+ volatile private NormServerBaseImpl server;
+
+ /**
+ * Queue we use to decouple the reception of events from the lease
+ * renewal manager from the scheduling of the sending of remote
+ * events and modifying our internal tables (which both require
+ * obtaining serious locks).
+ */
+ final private Queue queue = new Queue();
+
+ /** Any events that hold this object are ignored */
+ final static LeaseDeniedException EXPIRED_SET_EXCEPTION =
+ new LeaseDeniedException("Set Expired");
+
+ /**
+ * Simple constructor
+ */
+ LRMEventListener() {
+ super("LRM Event Listener");
+ setDaemon(false);
+ }
+
+ /**
+ * Set only once after construction.
+ * @param server Object that will make the actual internal updates and
+ * schedule the sending of remote events
+ */
+ void setServer(NormServerBaseImpl server){
+ synchronized (this){
+ if (this.server == null) this.server = server;
+ }
+ }
+
+ //////////////////////////////////////////////////
+ // Methods required by the LeaseListener interface
+
+ // Inherit java doc from super type
+ public void notify(LeaseRenewalEvent e) {
+ // Drop if the exception field is == to EXPIRED_SET_EXCEPTION, this
+ // implies that lease could not be renewed because the wrapper
+ // has determined that the set has expired.
+ if (e.getException() == EXPIRED_SET_EXCEPTION)
+ return;
+
+ // Paranoia, check to make sure that lease is one of wrapped
+ // client lease...if it's not, ignore the event
+ final Lease l = e.getLease();
+ if (l instanceof ClientLeaseWrapper) {
+ final ClientLeaseWrapper clw = (ClientLeaseWrapper) l;
+ queue.enqueue(new Discriminator(clw, true));
+ }
+ }
+
+ //////////////////////////////////////////////////////////////
+ // Methods required by the DesiredExpirationListener interface
+
+ // Inherit java doc from super type
+ public void expirationReached(LeaseRenewalEvent e) {
+ // Paranoia, check to make sure that lease is one of wrapped
+ // client lease...if it's not, ignore the event
+ final Lease l = e.getLease();
+ if (l instanceof ClientLeaseWrapper) {
+ final ClientLeaseWrapper clw = (ClientLeaseWrapper) l;
+ queue.enqueue(new Discriminator(clw, false)) ;
+ }
+ }
+
+ public void run() {
+ // Loop taking items off the queue and pass them to the server
+ while (!hasBeenInterrupted()) {
+ try {
+ final Discriminator d = (Discriminator) queue.dequeue();
+
+ if (d.isFailure) {
+ server.renewalFailure(d.clw);
+ } else {
+ server.desiredExpirationReached(d.clw);
+ }
+
+ } catch (InterruptedException e) {
+ // Someone wants this thread dead -- just return
+ return;
+ } catch (RuntimeException e) {
+ logger.log(Level.INFO,
+ "Exception in LRMEventListener Notifier while " +
+ "processing an event from the LRM -- " +
+ "attempting to continue",
+ e);
+
+ } catch (Error e) {
+ logger.log(Level.INFO,
+ "Exception in LRMEventListener Notifier while " +
+ "processing an event from the LRM -- " +
+ "attempting to continue",
+ e);
+ }
+ }
+ }
+
+ /**
+ * Trivial container class to tell us if we are processing the given
+ * wrapper because of a failure event or a desired expiration
+ * reached event.
+ */
+ static private class Discriminator {
+ /** true if this wrapper is associated with a renewal failure event */
+ final private boolean isFailure;
+
+ /** The wrapped leases associated with the event */
+ final private ClientLeaseWrapper clw;
+
+ private Discriminator(ClientLeaseWrapper clw, boolean isFailure) {
+ this.isFailure = isFailure;
+ this.clw = clw;
+ }
+ }
+}
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LeaseExpirationMgr.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LeaseExpirationMgr.java?rev=1593493&r1=1593492&r2=1593493&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LeaseExpirationMgr.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/LeaseExpirationMgr.java Fri May 9 07:03:18 2014
@@ -1,369 +1,369 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.sun.jini.norm;
-
-import java.lang.ref.WeakReference;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.sun.jini.collection.WeakTable;
-import com.sun.jini.landlord.LeasedResource;
-import com.sun.jini.thread.InterruptedStatusThread;
-import com.sun.jini.thread.WakeupManager;
-
-/**
- * Lease manager that aggressively expires leases as their expiration times
- * occur. Also schedules and manages expiration warning events.
- * <p>
- * Note, unlike Mahalo's <code>LeaseExpirationManager</code> (which this
- * was seeded from), we make no attempt to make it generic because of
- * the need to schedule expiration warning events.
- *
- * @author Sun Microsystems, Inc.
- */
-class LeaseExpirationMgr implements WeakTable.KeyGCHandler {
- /** Logger for logging messages for this class */
- static final Logger logger = Logger.getLogger("com.sun.jini.norm");
-
- /**
- * Map of sets to task tickets.
- * <p>
- * A Note on Synchronization
- * <p>
- * Whenever we operate on the <code>ticketMap</code> we hold
- * the lock on the key being used. This is necessary because
- * expiration and warning sender tasks need to remove tickets from
- * the map but at the same time a renewal may be updating the map
- * to associate the set with a new ticket. If we don't synchronize
- * there is a small window where a task could remove the ticket
- * for its replacement.
- */
- private final WeakTable ticketMap;
-
- /** Ref to the main server object has all the top level methods */
- private volatile NormServerBaseImpl server;
-
- /** Queue of tasks, ordered by time */
- private final WakeupManager runQueue = new WakeupManager();
-
- /** Queue of tasks to expire sets */
- private final List expireQueue = new LinkedList();
-
- /** Thread to expire sets */
- private final Thread expireThread = new ExpirationThread();
-
- /**
- * Create a <code>LeaseExpirationMgr</code> to aggressively expire
- * the leases of the passed <code>NormServerBaseImpl</code>
- */
- LeaseExpirationMgr(NormServerBaseImpl server) {
- this.server = server;
- ticketMap = new WeakTable(this); //this escape is safe.
- }
-
- LeaseExpirationMgr(){
- ticketMap = new WeakTable(this); //this escape is safe.
- }
-
- /**
- * Can be set once only after construction.
- * @param server
- */
- void setServer(NormServerBaseImpl server){
- synchronized (this){
- if (this.server == null) this.server = server;
- }
- }
-
- void start(){
- expireThread.start();
- }
-
- /**
- * Terminate the <code>LeaseExpirationMgr</code>, killing any
- * threads it has started
- */
- void terminate() {
- runQueue.stop();
- runQueue.cancelAll();
- expireThread.interrupt();
- }
-
- /**
- * Notifies the manager of a new lease being created.
- *
- * @param resource the resource associated with the new lease
- */
- void register(LeasedResource resource) {
- // Need to synchronize because schedule manipulates
- // ticketMap.
- synchronized (resource) {
- schedule(resource);
- }
- }
-
- /**
- * Notifies the manager of a lease being renewed. <p>
- *
- * This method assumes the lock on <code>resource</code> is owned by the
- * current thread.
- *
- * @param resource the set for which tasks have to be rescheduled
- */
- void reschedule(LeasedResource resource) {
- /*
- * Remove the old event. This method is only called
- * (indirectly) from NormServerBaseImpl.renew() so we know that
- * we own the lock on resource.
- */
- WakeupManager.Ticket ticket =
- (WakeupManager.Ticket) ticketMap.remove(resource);
- if (ticket != null) {
- runQueue.cancel(ticket);
- }
- // Schedule the new event
- schedule(resource);
- }
-
- /**
- * Schedule a leased resource to be reaped in the future. Called
- * when a resource gets a lease, a lease is renewed, and during log
- * recovery.
- * <p>
- * This method assumes the lock on <code>resource</code> is owned by
- * the current thread.
- */
- void schedule(LeasedResource resource) {
- WakeupManager.Ticket ticket;
- final LeaseSet set = (LeaseSet) resource;
- MgrTask task;
-
- if (set.haveWarningRegistration()) {
- task = new SendWarning(set);
- ticket = runQueue.schedule(set.getWarningTime(), task);
- } else {
- task = new QueueExpiration(set);
- ticket = runQueue.schedule(set.getExpiration(), task);
- }
-
- /*
- * No window here because the tasks only use the ticket after
- * they acquire the lock on their set, but we still own the lock
- * on the set.
- */
- task.setTicket(ticket);
- ticketMap.getOrAdd(set, ticket);
- }
-
- // purposefully inherit doc comment from supertype
- // Called when LeaseResource we are tracking is garbage collected
- public void keyGC(Object value) {
- final WakeupManager.Ticket ticket = (WakeupManager.Ticket) value;
- runQueue.cancel(ticket);
- }
-
- /**
- * Expires sets queued for expiration. Perform the expiration in a
- * separate thread because the operation will block if a snapshot is going
- * on. It's OK for an expiration to block other expirations, which need
- * not be timely, but using the separate thread avoids blocking renewal
- * warnings, which should be timely.
- */
- private class ExpirationThread extends InterruptedStatusThread {
-
- ExpirationThread() {
- super("expire lease sets thread");
- setDaemon(true);
- }
-
- public void run() {
- while (!hasBeenInterrupted()) {
- try {
- Runnable task;
- synchronized (expireQueue) {
- if (expireQueue.isEmpty()) {
- expireQueue.wait();
- continue;
- }
- task = (Runnable) expireQueue.remove(0);
- }
- task.run();
- } catch (InterruptedException e) {
- return;
- } catch (Throwable t) {
- logger.log(Level.INFO,
- "Exception in lease set expiration thread -- " +
- "attempting to continue",
- t);
- }
- }
- }
- }
-
- /**
- * Utility base class for our tasks, mainly provides the the proper
- * locking for manipulating the ticketMap.
- */
- private abstract class MgrTask implements Runnable {
- /** Resource this task is to operate on */
- protected final WeakReference resourceRef;
-
- /** Ticket for this task */
- private WakeupManager.Ticket ticket;
-
- /**
- * Simple constructor.
- *
- * @param set the set this task is to operate on
- */
- protected MgrTask(LeaseSet set) {
- resourceRef = new WeakReference(set);
- }
-
- /** Set the ticket associated with this task. */
- private void setTicket(WakeupManager.Ticket ticket) {
- this.ticket = ticket;
- }
-
- /**
- * Removes this task's ticket from the ticket map iff this
- * task's ticket is in the map. Returns the
- * <code>LeaseSet</code> this task is to operate on or
- * <code>null</code> if this task should stop.
- */
- protected LeaseSet removeOurTicket() {
- final LeaseSet set = (LeaseSet) resourceRef.get();
- if (set != null) {
- synchronized (set) {
- final WakeupManager.Ticket currentTicket =
- (WakeupManager.Ticket) ticketMap.get(set);
- if (ticket.equals(currentTicket)) {
- ticketMap.remove(set);
- } else {
- /*
- * Someone removed us after we were committed to
- * run -- we should stop.
- */
- return null;
- }
- }
- }
-
- return set;
- }
-
- // purposefully inherit doc comment from supertype
- public abstract void run();
- }
-
- /** Task that queues a task to expire a lease set. */
- private class QueueExpiration extends MgrTask {
- QueueExpiration(LeaseSet set) {
- super(set);
- }
-
- public void run() {
- LeaseSet set = removeOurTicket();
- if (set != null) {
- synchronized (expireQueue) {
- expireQueue.add(new Expiration(set));
- expireQueue.notifyAll();
- }
- }
- }
- }
-
- /**
- * Objects that do the actual expiration of the set in question,
- * stuck in <code>expireQueue</code>.
- */
- private class Expiration implements Runnable {
-
- private final LeaseSet set;
- /**
- * Create a <code>Expiration</code> task for the passed resource.
- *
- * @param set the set this task is to operate on
- */
- private Expiration(LeaseSet set) {
- this.set = set;
- }
-
- // purposefully inherit doc comment from supertype
- public void run() {
- server.expireIfTime(set);
- /*
- * Note we don't care if it's actually time or not, if it
- * is not the task will be rescheduled by the renewal.
- */
- }
- }
-
- /**
- * Objects that do the schedule the warning events, also schedules
- * an expiration task.
- */
- private class SendWarning extends MgrTask {
- /**
- * Create a <code>SendWarning</code> task for the passed resource.
- *
- * @param set the set this task is to operate on
- */
- private SendWarning(LeaseSet set) {
- super(set);
- }
-
- // purposefully inherit doc comment from supertype
- public void run() {
- final LeaseSet s = (LeaseSet) resourceRef.get();
- if (s == null) {
- // set is gone, no work to do
- return;
- }
-
- /*
- * By holding this lock we prevent other threads from
- * scheduling new tasks for this set...if we have been
- * replaced we will return before scheduling a new task, if
- * we have not been we will schedule the new task and it can
- * be cleanly removed by any renew that is happening at the
- * same time.
- */
- synchronized (s) {
- final LeaseSet set = removeOurTicket();
- if (set == null) {
- // set is gone, or our task was replaced, no work to do
- return;
- }
-
- // Send event
- server.sendWarningEvent(set);
-
- // Schedule expiration task
- final MgrTask task = new QueueExpiration(set);
- final WakeupManager.Ticket newTicket =
- runQueue.schedule(set.getExpiration(), task);
- task.setTicket(newTicket);
- ticketMap.getOrAdd(set, newTicket);
- }
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.norm;
+
+import java.lang.ref.WeakReference;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.sun.jini.collection.WeakTable;
+import com.sun.jini.landlord.LeasedResource;
+import com.sun.jini.thread.InterruptedStatusThread;
+import com.sun.jini.thread.WakeupManager;
+
+/**
+ * Lease manager that aggressively expires leases as their expiration times
+ * occur. Also schedules and manages expiration warning events.
+ * <p>
+ * Note, unlike Mahalo's <code>LeaseExpirationManager</code> (which this
+ * was seeded from), we make no attempt to make it generic because of
+ * the need to schedule expiration warning events.
+ *
+ * @author Sun Microsystems, Inc.
+ */
+class LeaseExpirationMgr implements WeakTable.KeyGCHandler {
+ /** Logger for logging messages for this class */
+ static final Logger logger = Logger.getLogger("com.sun.jini.norm");
+
+ /**
+ * Map of sets to task tickets.
+ * <p>
+ * A Note on Synchronization
+ * <p>
+ * Whenever we operate on the <code>ticketMap</code> we hold
+ * the lock on the key being used. This is necessary because
+ * expiration and warning sender tasks need to remove tickets from
+ * the map but at the same time a renewal may be updating the map
+ * to associate the set with a new ticket. If we don't synchronize
+ * there is a small window where a task could remove the ticket
+ * for its replacement.
+ */
+ private final WeakTable ticketMap;
+
+ /** Ref to the main server object has all the top level methods */
+ private volatile NormServerBaseImpl server;
+
+ /** Queue of tasks, ordered by time */
+ private final WakeupManager runQueue = new WakeupManager();
+
+ /** Queue of tasks to expire sets */
+ private final List expireQueue = new LinkedList();
+
+ /** Thread to expire sets */
+ private final Thread expireThread = new ExpirationThread();
+
+ /**
+ * Create a <code>LeaseExpirationMgr</code> to aggressively expire
+ * the leases of the passed <code>NormServerBaseImpl</code>
+ */
+ LeaseExpirationMgr(NormServerBaseImpl server) {
+ this.server = server;
+ ticketMap = new WeakTable(this); //this escape is safe.
+ }
+
+ LeaseExpirationMgr(){
+ ticketMap = new WeakTable(this); //this escape is safe.
+ }
+
+ /**
+ * Can be set once only after construction.
+ * @param server
+ */
+ void setServer(NormServerBaseImpl server){
+ synchronized (this){
+ if (this.server == null) this.server = server;
+ }
+ }
+
+ void start(){
+ expireThread.start();
+ }
+
+ /**
+ * Terminate the <code>LeaseExpirationMgr</code>, killing any
+ * threads it has started
+ */
+ void terminate() {
+ runQueue.stop();
+ runQueue.cancelAll();
+ expireThread.interrupt();
+ }
+
+ /**
+ * Notifies the manager of a new lease being created.
+ *
+ * @param resource the resource associated with the new lease
+ */
+ void register(LeasedResource resource) {
+ // Need to synchronize because schedule manipulates
+ // ticketMap.
+ synchronized (resource) {
+ schedule(resource);
+ }
+ }
+
+ /**
+ * Notifies the manager of a lease being renewed. <p>
+ *
+ * This method assumes the lock on <code>resource</code> is owned by the
+ * current thread.
+ *
+ * @param resource the set for which tasks have to be rescheduled
+ */
+ void reschedule(LeasedResource resource) {
+ /*
+ * Remove the old event. This method is only called
+ * (indirectly) from NormServerBaseImpl.renew() so we know that
+ * we own the lock on resource.
+ */
+ WakeupManager.Ticket ticket =
+ (WakeupManager.Ticket) ticketMap.remove(resource);
+ if (ticket != null) {
+ runQueue.cancel(ticket);
+ }
+ // Schedule the new event
+ schedule(resource);
+ }
+
+ /**
+ * Schedule a leased resource to be reaped in the future. Called
+ * when a resource gets a lease, a lease is renewed, and during log
+ * recovery.
+ * <p>
+ * This method assumes the lock on <code>resource</code> is owned by
+ * the current thread.
+ */
+ void schedule(LeasedResource resource) {
+ WakeupManager.Ticket ticket;
+ final LeaseSet set = (LeaseSet) resource;
+ MgrTask task;
+
+ if (set.haveWarningRegistration()) {
+ task = new SendWarning(set);
+ ticket = runQueue.schedule(set.getWarningTime(), task);
+ } else {
+ task = new QueueExpiration(set);
+ ticket = runQueue.schedule(set.getExpiration(), task);
+ }
+
+ /*
+ * No window here because the tasks only use the ticket after
+ * they acquire the lock on their set, but we still own the lock
+ * on the set.
+ */
+ task.setTicket(ticket);
+ ticketMap.getOrAdd(set, ticket);
+ }
+
+ // purposefully inherit doc comment from supertype
+ // Called when LeaseResource we are tracking is garbage collected
+ public void keyGC(Object value) {
+ final WakeupManager.Ticket ticket = (WakeupManager.Ticket) value;
+ runQueue.cancel(ticket);
+ }
+
+ /**
+ * Expires sets queued for expiration. Perform the expiration in a
+ * separate thread because the operation will block if a snapshot is going
+ * on. It's OK for an expiration to block other expirations, which need
+ * not be timely, but using the separate thread avoids blocking renewal
+ * warnings, which should be timely.
+ */
+ private class ExpirationThread extends InterruptedStatusThread {
+
+ ExpirationThread() {
+ super("expire lease sets thread");
+ setDaemon(false);
+ }
+
+ public void run() {
+ while (!hasBeenInterrupted()) {
+ try {
+ Runnable task;
+ synchronized (expireQueue) {
+ if (expireQueue.isEmpty()) {
+ expireQueue.wait();
+ continue;
+ }
+ task = (Runnable) expireQueue.remove(0);
+ }
+ task.run();
+ } catch (InterruptedException e) {
+ return;
+ } catch (Throwable t) {
+ logger.log(Level.INFO,
+ "Exception in lease set expiration thread -- " +
+ "attempting to continue",
+ t);
+ }
+ }
+ }
+ }
+
+ /**
+ * Utility base class for our tasks, mainly provides the the proper
+ * locking for manipulating the ticketMap.
+ */
+ private abstract class MgrTask implements Runnable {
+ /** Resource this task is to operate on */
+ protected final WeakReference resourceRef;
+
+ /** Ticket for this task */
+ private WakeupManager.Ticket ticket;
+
+ /**
+ * Simple constructor.
+ *
+ * @param set the set this task is to operate on
+ */
+ protected MgrTask(LeaseSet set) {
+ resourceRef = new WeakReference(set);
+ }
+
+ /** Set the ticket associated with this task. */
+ private void setTicket(WakeupManager.Ticket ticket) {
+ this.ticket = ticket;
+ }
+
+ /**
+ * Removes this task's ticket from the ticket map iff this
+ * task's ticket is in the map. Returns the
+ * <code>LeaseSet</code> this task is to operate on or
+ * <code>null</code> if this task should stop.
+ */
+ protected LeaseSet removeOurTicket() {
+ final LeaseSet set = (LeaseSet) resourceRef.get();
+ if (set != null) {
+ synchronized (set) {
+ final WakeupManager.Ticket currentTicket =
+ (WakeupManager.Ticket) ticketMap.get(set);
+ if (ticket.equals(currentTicket)) {
+ ticketMap.remove(set);
+ } else {
+ /*
+ * Someone removed us after we were committed to
+ * run -- we should stop.
+ */
+ return null;
+ }
+ }
+ }
+
+ return set;
+ }
+
+ // purposefully inherit doc comment from supertype
+ public abstract void run();
+ }
+
+ /** Task that queues a task to expire a lease set. */
+ private class QueueExpiration extends MgrTask {
+ QueueExpiration(LeaseSet set) {
+ super(set);
+ }
+
+ public void run() {
+ LeaseSet set = removeOurTicket();
+ if (set != null) {
+ synchronized (expireQueue) {
+ expireQueue.add(new Expiration(set));
+ expireQueue.notifyAll();
+ }
+ }
+ }
+ }
+
+ /**
+ * Objects that do the actual expiration of the set in question,
+ * stuck in <code>expireQueue</code>.
+ */
+ private class Expiration implements Runnable {
+
+ private final LeaseSet set;
+ /**
+ * Create a <code>Expiration</code> task for the passed resource.
+ *
+ * @param set the set this task is to operate on
+ */
+ private Expiration(LeaseSet set) {
+ this.set = set;
+ }
+
+ // purposefully inherit doc comment from supertype
+ public void run() {
+ server.expireIfTime(set);
+ /*
+ * Note we don't care if it's actually time or not, if it
+ * is not the task will be rescheduled by the renewal.
+ */
+ }
+ }
+
+ /**
+ * Objects that do the schedule the warning events, also schedules
+ * an expiration task.
+ */
+ private class SendWarning extends MgrTask {
+ /**
+ * Create a <code>SendWarning</code> task for the passed resource.
+ *
+ * @param set the set this task is to operate on
+ */
+ private SendWarning(LeaseSet set) {
+ super(set);
+ }
+
+ // purposefully inherit doc comment from supertype
+ public void run() {
+ final LeaseSet s = (LeaseSet) resourceRef.get();
+ if (s == null) {
+ // set is gone, no work to do
+ return;
+ }
+
+ /*
+ * By holding this lock we prevent other threads from
+ * scheduling new tasks for this set...if we have been
+ * replaced we will return before scheduling a new task, if
+ * we have not been we will schedule the new task and it can
+ * be cleanly removed by any renew that is happening at the
+ * same time.
+ */
+ synchronized (s) {
+ final LeaseSet set = removeOurTicket();
+ if (set == null) {
+ // set is gone, or our task was replaced, no work to do
+ return;
+ }
+
+ // Send event
+ server.sendWarningEvent(set);
+
+ // Schedule expiration task
+ final MgrTask task = new QueueExpiration(set);
+ final WakeupManager.Ticket newTicket =
+ runQueue.schedule(set.getExpiration(), task);
+ task.setTicket(newTicket);
+ ticketMap.getOrAdd(set, newTicket);
+ }
+ }
+ }
+}