You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2013/12/10 12:20:58 UTC
svn commit: r1549810 - in /river/jtsk/skunk/qa_refactor/trunk:
qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java
src/com/sun/jini/reggie/RegistrarImpl.java
src/com/sun/jini/thread/InterruptedStatusThread.java
Author: peter_firmstone
Date: Tue Dec 10 11:20:57 2013
New Revision: 1549810
URL: http://svn.apache.org/r1549810
Log:
Attempting to fix test failures on Windows Server 2008 Jdk1.7.0_25, using Jenkins as test environment, refactoring of RegistrarImpl
Modified:
river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java
river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java
river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java
Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java?rev=1549810&r1=1549809&r2=1549810&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/spec/lookupservice/test_set00/MultipleEvntLeaseRenewals.java Tue Dec 10 11:20:57 2013
@@ -86,7 +86,7 @@ public class MultipleEvntLeaseRenewals e
}
/** The event handler for the services registered by this class */
- private static BasicListener listener;
+ private BasicListener listener;
/** Performs actions necessary to prepare for execution of the
* current QA test.
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java?rev=1549810&r1=1549809&r2=1549810&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java Tue Dec 10 11:20:57 2013
@@ -38,6 +38,7 @@ import com.sun.jini.reliableLog.LogHandl
import com.sun.jini.reliableLog.ReliableLog;
import com.sun.jini.start.LifeCycle;
import com.sun.jini.thread.InterruptedStatusThread;
+import com.sun.jini.thread.InterruptedStatusThread.Interruptable;
import com.sun.jini.thread.ReadersWriter;
import com.sun.jini.thread.ReadersWriter.ConcurrentLockException;
//import com.sun.jini.thread.ReadyState;
@@ -88,7 +89,9 @@ import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
+import java.util.SortedSet;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
@@ -96,6 +99,8 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ServerSocketFactory;
@@ -221,7 +226,7 @@ class RegistrarImpl implements Registrar
* Identity map from SvcReg to SvcReg, ordered by lease expiration.
* Every service is in this map.
*/
- private final SortedMap<SvcReg,SvcReg> serviceByTime = new TreeMap<SvcReg,SvcReg>();
+ private final SortedSet<SvcReg> serviceByTime = new TreeSet<SvcReg>();
/**
* Map from String to HashMap mapping ServiceID to SvcReg. Every service
* is in this map under its types.
@@ -316,7 +321,8 @@ class RegistrarImpl implements Registrar
/** Event lease expiration thread */
private final Thread eventExpirer;
/** Unicast discovery request packet receiving thread */
- private volatile UnicastThread unicaster;
+ private volatile Thread unicaster;
+ private volatile Unicast unicast;
/** Multicast discovery request packet receiving thread */
private final Thread multicaster;
/** Multicast discovery announcement sending thread */
@@ -341,7 +347,7 @@ class RegistrarImpl implements Registrar
/** Flag indicating whether system is in a state of recovery */
private volatile boolean inRecovery;
/** Current number of records in the Log File since the last snapshot */
- private int logFileSize = 0;
+ private final AtomicInteger logFileSize = new AtomicInteger();
/** Log file must contain this many records before snapshot allowed */
private final int persistenceSnapshotThreshold ;
@@ -366,7 +372,7 @@ class RegistrarImpl implements Registrar
/** Interval to wait in between sending multicast announcements */
private final long multicastAnnouncementInterval;
/** Multicast announcement sequence number */
- private volatile long announcementSeqNo = 0L;
+ private final AtomicLong announcementSeqNo = new AtomicLong();
/** Network interfaces to use for multicast discovery */
private final NetworkInterface[] multicastInterfaces;
@@ -536,7 +542,6 @@ class RegistrarImpl implements Registrar
TimeUnit.MINUTES,
new LinkedBlockingQueue()
);
-// exec.allowCoreThreadTimeOut(true);
eventNotifierExec = exec;
// Set up Executor to perform discovery responses, this will naturally
// shutdown when Reggie terminates.
@@ -547,13 +552,12 @@ class RegistrarImpl implements Registrar
TimeUnit.MINUTES,
new LinkedBlockingQueue()
);
-// exec.allowCoreThreadTimeOut(true);
discoveryResponseExec = exec;
ReliableLog log = null;
Thread serviceExpirer = null;
Thread eventExpirer = null;
- UnicastThread unicaster = null;
+ Thread unicaster = null;
Thread multicaster = null;
Thread announcer = null;
Thread snapshotter = null;
@@ -564,37 +568,46 @@ class RegistrarImpl implements Registrar
@Override
public List<Thread> run() throws Exception {
+ Thread t;
List<Thread> list = new ArrayList<Thread>(6);
- list.add(new ServiceExpireThread());
- list.add(new EventExpireThread());
- list.add(new UnicastThread(unicastPort));
- list.add(new MulticastThread());
- list.add(new AnnounceThread());
- list.add(new SnapshotThread());
+ list.add(newDaemonThread(new ServiceExpire(RegistrarImpl.this), "service expire"));
+ list.add(newDaemonThread(new EventExpire(RegistrarImpl.this),"event expire"));
+ unicast = new Unicast(RegistrarImpl.this, unicastPort);
+ list.add(newInterruptStatusThread(unicast, "unicast request"));
+ list.add(newInterruptStatusThread(new Multicast(RegistrarImpl.this), "multicast request"));
+ list.add(newDaemonThread(new Announce(RegistrarImpl.this),"discovery announcement"));
+ list.add(newDaemonThread(new Snapshot(RegistrarImpl.this),"snapshot thread"));
return list;
}
+
+ private Thread newDaemonThread(Runnable r, String name){
+ Thread t = new Thread(r,name);
+ t.setDaemon(true);
+ return t;
+ }
+
+ private Thread newInterruptStatusThread(Runnable r, String name){
+ Thread t = new InterruptedStatusThread(r,name);
+ t.setDaemon(true);
+ return t;
+ }
}, context);
serviceExpirer = threads.get(0);
eventExpirer = threads.get(1);
- unicaster = (UnicastThread) threads.get(2);
+ unicaster = threads.get(2);
multicaster = threads.get(3);
announcer = threads.get(4);
snapshotter = threads.get(5);
if (init.persistent){
- log = new ReliableLog(init.persistenceDirectory, new LocalLogHandler());
+ log = new ReliableLog(init.persistenceDirectory, new LocalLogHandler(this));
if (logger.isLoggable(Level.CONFIG)) {
logger.log(Level.CONFIG, "using persistence directory {0}",
new Object[]{ init.persistenceDirectory });
}
- log.recover();
} else {
log = null;
}
- // log snapshot recovers myServiceID
- if (myServiceID == null) {
- myServiceID = newServiceID();
- }
constructionException = null;
} catch (PrivilegedActionException ex) {
@@ -615,7 +628,6 @@ class RegistrarImpl implements Registrar
loginContext = init.loginContext;
unicastDiscoveryHost = init.unicastDiscoveryHost;
config = init.config;
- computeMaxLeases();
}
/** A service item registration record. */
@@ -640,7 +652,7 @@ class RegistrarImpl implements Registrar
*
* @serial
*/
- public long leaseExpiration;
+ public volatile long leaseExpiration;
/** Simple constructor */
public SvcReg(Item item, Uuid leaseID, long leaseExpiration) {
@@ -1733,18 +1745,20 @@ class RegistrarImpl implements Registrar
* where 'dirname' is the name of the directory path (relative or
* absolute) where the snapshot and log file will be maintained.
*/
- private class LocalLogHandler extends LogHandler {
-
+ private static class LocalLogHandler extends LogHandler {
+ private final RegistrarImpl reggie;
/** Simple constructor */
- public LocalLogHandler() { }
+ public LocalLogHandler(RegistrarImpl reggie) {
+ this.reggie = reggie;
+ }
/* Overrides snapshot() defined in ReliableLog's LogHandler class. */
public void snapshot(OutputStream out) throws IOException {
- concurrentObj.readLock();
+ reggie.concurrentObj.readLock();
try {
- takeSnapshot(out);
+ reggie.takeSnapshot(out);
} finally {
- concurrentObj.readUnlock();
+ reggie.concurrentObj.readUnlock();
}
}
@@ -1752,11 +1766,11 @@ class RegistrarImpl implements Registrar
public void recover(InputStream in)
throws IOException, ClassNotFoundException
{
- concurrentObj.writeLock();
+ reggie.concurrentObj.writeLock();
try {
- recoverSnapshot(in);
+ reggie.recoverSnapshot(in);
} finally {
- concurrentObj.writeUnlock();
+ reggie.concurrentObj.writeUnlock();
}
}
@@ -1783,12 +1797,12 @@ class RegistrarImpl implements Registrar
* by the type of record that was retrieved.
*/
public void applyUpdate(Object logRecObj) {
- ((LogRecord)logRecObj).apply(RegistrarImpl.this);
+ ((LogRecord)logRecObj).apply(reggie);
}
}
/** Base class for iterating over all Items that match a Template. */
- private abstract class ItemIter {
+ private static abstract class ItemIter {
/** Current time */
public final long now = System.currentTimeMillis();
/** True means duplicate items are possible */
@@ -1831,18 +1845,19 @@ class RegistrarImpl implements Registrar
}
/** Iterate over all Items. */
- private class AllItemIter extends ItemIter {
+ private static class AllItemIter extends ItemIter {
/** Iterator over serviceByID */
- private final Iterator iter;
+ private final Iterator<SvcReg> iter;
/** Assumes the empty template */
- public AllItemIter() {
+ public AllItemIter(Iterator<SvcReg> it) {
super(null);
- iter = serviceByID.values().iterator();
+ iter = it;
step();
}
/** Set reg to the next matching element, or null if none */
+ @Override
protected void step() {
while (iter.hasNext()) {
reg = (SvcReg)iter.next();
@@ -1854,20 +1869,17 @@ class RegistrarImpl implements Registrar
}
/** Iterates over all services that match template's service types */
- private class SvcIterator extends ItemIter {
+ private static class SvcIterator extends ItemIter {
/** Iterator for list of matching services. */
- private final Iterator services;
+ private final Iterator<SvcReg> services;
/**
* tmpl.serviceID == null and
* tmpl.serviceTypes is not empty
*/
- public SvcIterator(Template tmpl) {
+ public SvcIterator(Template tmpl, Iterator<SvcReg> it) {
super(tmpl);
- Map map = (Map) serviceByTypeName.get(
- tmpl.serviceTypes[0].getName());
- services = map != null ? map.values().iterator() :
- Collections.EMPTY_LIST.iterator();
+ services = it;
step();
}
@@ -1894,35 +1906,24 @@ class RegistrarImpl implements Registrar
}
/** Iterate over all matching Items by attribute value. */
- private class AttrItemIter extends ItemIter {
+ private static class AttrItemIter extends ItemIter {
/** SvcRegs obtained from serviceByAttr for chosen attr */
- protected List<SvcReg> svcs;
+ private final List<SvcReg> svcs;
/** Current index into svcs */
- protected int svcidx;
+ private int svcidx;
/**
* tmpl.serviceID == null and
* tmpl.serviceTypes is empty and
* tmpl.attributeSetTemplates[setidx].fields[fldidx] != null
*/
- public AttrItemIter(Template tmpl, int setidx, int fldidx) {
- super(tmpl);
- EntryRep set = tmpl.attributeSetTemplates[setidx];
- HashMap<Object,List<SvcReg>>[] attrMaps =
- serviceByAttr.get(getDefiningClass(set.eclass,
- fldidx));
- if (attrMaps != null && attrMaps[fldidx] != null) {
- svcs = attrMaps[fldidx].get(set.fields[fldidx]);
- if (svcs != null) {
- svcidx = svcs.size();
- step();
- }
- }
- }
-
- /** Simple constructor */
- protected AttrItemIter(Template tmpl) {
+ public AttrItemIter(Template tmpl, List<SvcReg> svcs) {
super(tmpl);
+ this.svcs = svcs;
+ if (svcs != null) {
+ svcidx = svcs.size();
+ step();
+ }
}
/** Set reg to the next matching element, or null if none. */
@@ -1938,24 +1939,6 @@ class RegistrarImpl implements Registrar
}
}
- /** Iterate over all matching Items by no-fields entry class. */
- private class EmptyAttrItemIter extends AttrItemIter {
-
- /**
- * tmpl.serviceID == null and
- * tmpl.serviceTypes is empty and
- * eclass has no fields
- */
- public EmptyAttrItemIter(Template tmpl, EntryClass eclass) {
- super(tmpl);
- svcs = serviceByEmptyAttr.get(eclass);
- if (svcs != null) {
- svcidx = svcs.size();
- step();
- }
- }
- }
-
/** Iterate over all matching Items by entry class, dups possible. */
private class ClassItemIter extends ItemIter {
/** Entry class to match on */
@@ -2040,15 +2023,16 @@ class RegistrarImpl implements Registrar
}
/** Iterate over a singleton matching Item by serviceID. */
- private class IDItemIter extends ItemIter {
+ private static class IDItemIter extends ItemIter {
/** tmpl.serviceID != null */
- public IDItemIter(Template tmpl) {
+ public IDItemIter(Template tmpl, SvcReg reg) {
super(tmpl);
- reg = serviceByID.get(tmpl.serviceID);
if (reg != null &&
- (reg.leaseExpiration <= now || !matchItem(tmpl, reg.item)))
+ (reg.leaseExpiration <= now || !matchItem(tmpl, reg.item))) {
reg = null;
+ }
+ this.reg = reg;
}
/** Set reg to null */
@@ -2405,30 +2389,29 @@ class RegistrarImpl implements Registrar
}
/** Service lease expiration thread code */
- private class ServiceExpireThread extends InterruptedStatusThread {
-
+ private static class ServiceExpire implements Runnable {
+ RegistrarImpl reggie;
/** Create a daemon thread */
- public ServiceExpireThread() {
- super("service expire");
- setDaemon(true);
+ public ServiceExpire(RegistrarImpl reggie) {
+ this.reggie = reggie;
}
public void run() {
try {
- concurrentObj.writeLock();
+ reggie.concurrentObj.writeLock();
} catch (ConcurrentLockException e) {
return;
}
try {
- while (!hasBeenInterrupted()) {
+ while (!Thread.currentThread().isInterrupted()) {
long now = System.currentTimeMillis();
while (true) {
- SvcReg reg = serviceByTime.firstKey();
- minSvcExpiration = reg.leaseExpiration;
- if (minSvcExpiration > now)
+ SvcReg reg = reggie.serviceByTime.first();
+ reggie.minSvcExpiration = reg.leaseExpiration;
+ if (reggie.minSvcExpiration > now)
break;
- deleteService(reg, now);
- addLogRecord(new ServiceLeaseCancelledLogObj(
+ reggie.deleteService(reg, now);
+ reggie.addLogRecord(new ServiceLeaseCancelledLogObj(
reg.item.serviceID, reg.leaseID));
if (logger.isLoggable(Level.FINE)) {
logger.log(
@@ -2437,46 +2420,44 @@ class RegistrarImpl implements Registrar
new Object[]{ reg.item.serviceID });
}
}
- queueEvents();
try {
- concurrentObj.writerWait(serviceNotifier,
- minSvcExpiration - now);
+ reggie.concurrentObj.writerWait(reggie.serviceNotifier,
+ reggie.minSvcExpiration - now);
} catch (ConcurrentLockException e) {
return;
}
}
} finally {
- concurrentObj.writeUnlock();
+ reggie.concurrentObj.writeUnlock();
}
}
}
/** Event lease expiration thread code */
- private class EventExpireThread extends InterruptedStatusThread {
-
+ private static class EventExpire implements Runnable {
+ private final RegistrarImpl reggie;
/** Create a daemon thread */
- public EventExpireThread() {
- super("event expire");
- setDaemon(true);
+ public EventExpire(RegistrarImpl reggie) {
+ this.reggie = reggie;
}
public void run() {
try {
- concurrentObj.writeLock();
+ reggie.concurrentObj.writeLock();
} catch (ConcurrentLockException e) {
return;
}
try {
- while (!hasBeenInterrupted()) {
+ while (!Thread.currentThread().isInterrupted()) {
long now = System.currentTimeMillis();
- minEventExpiration = Long.MAX_VALUE;
- while (!eventByTime.isEmpty()) {
- EventReg reg = eventByTime.firstKey();
+ reggie.minEventExpiration = Long.MAX_VALUE;
+ while (!reggie.eventByTime.isEmpty()) {
+ EventReg reg = reggie.eventByTime.firstKey();
if (reg.getLeaseExpiration() > now) {
- minEventExpiration = reg.getLeaseExpiration();
+ reggie.minEventExpiration = reg.getLeaseExpiration();
break;
}
- deleteEvent(reg);
+ reggie.deleteEvent(reg);
if (logger.isLoggable(Level.FINE)) {
logger.log(
Level.FINE,
@@ -2485,14 +2466,14 @@ class RegistrarImpl implements Registrar
}
}
try {
- concurrentObj.writerWait(eventNotifier,
- minEventExpiration - now);
+ reggie.concurrentObj.writerWait(reggie.eventNotifier,
+ reggie.minEventExpiration - now);
} catch (ConcurrentLockException e) {
return;
}
}
} finally {
- concurrentObj.writeUnlock();
+ reggie.concurrentObj.writeUnlock();
}
}
}
@@ -2585,24 +2566,25 @@ class RegistrarImpl implements Registrar
}
/** Multicast discovery request thread code. */
- private class MulticastThread extends InterruptedStatusThread {
-
+ private static class Multicast implements Runnable, Interruptable {
+ private final RegistrarImpl reggie;
/** Multicast group address used by multicast requests */
private final InetAddress requestAddr;
/** Multicast socket to receive packets */
private final MulticastSocket socket;
/** Interfaces for which configuration failed */
private final List<NetworkInterface> failedInterfaces;
+
+ private volatile boolean interrupted = false;
/**
* Create a high priority daemon thread. Set up the socket now
* rather than in run, so that we get any exception up front.
*/
- public MulticastThread() throws IOException {
- super("multicast request");
- setDaemon(true);
+ public Multicast(RegistrarImpl reggie) throws IOException {
+ this.reggie = reggie;
List<NetworkInterface> failedInterfaces = new ArrayList<NetworkInterface>();
- if (multicastInterfaces != null && multicastInterfaces.length == 0)
+ if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0)
{
requestAddr = null;
socket = null;
@@ -2611,11 +2593,12 @@ class RegistrarImpl implements Registrar
}
requestAddr = Constants.getRequestAddress();
socket = new MulticastSocket(Constants.discoveryPort);
- if (multicastInterfaces != null) {
- Level failureLogLevel = multicastInterfacesSpecified ?
+ if (reggie.multicastInterfaces != null) {
+ Level failureLogLevel = reggie.multicastInterfacesSpecified ?
Level.WARNING : Levels.HANDLED;
- for (int i = 0; i < multicastInterfaces.length; i++) {
- NetworkInterface nic = multicastInterfaces[i];
+ int l = reggie.multicastInterfaces.length;
+ for (int i = 0; i < l; i++) {
+ NetworkInterface nic = reggie.multicastInterfaces[i];
try {
socket.setNetworkInterface(nic);
socket.joinGroup(requestAddr);
@@ -2646,17 +2629,17 @@ class RegistrarImpl implements Registrar
}
public void run() {
- if (multicastInterfaces != null && multicastInterfaces.length == 0)
+ if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0)
{
return;
}
byte[] buf = new byte[
- multicastRequestConstraints.getMulticastMaxPacketSize(
+ reggie.multicastRequestConstraints.getMulticastMaxPacketSize(
DEFAULT_MAX_PACKET_SIZE)];
DatagramPacket dgram = new DatagramPacket(buf, buf.length);
long retryTime =
- System.currentTimeMillis() + multicastInterfaceRetryInterval;
- while (!hasBeenInterrupted()) {
+ System.currentTimeMillis() + reggie.multicastInterfaceRetryInterval;
+ while (!interrupted) {
try {
int timeout = 0;
if (!failedInterfaces.isEmpty()) {
@@ -2667,7 +2650,7 @@ class RegistrarImpl implements Registrar
if (failedInterfaces.isEmpty()) {
timeout = 0;
} else {
- timeout = multicastInterfaceRetryInterval;
+ timeout = reggie.multicastInterfaceRetryInterval;
retryTime =
System.currentTimeMillis() + timeout;
}
@@ -2689,12 +2672,12 @@ class RegistrarImpl implements Registrar
} catch (BufferUnderflowException e) {
throw new DiscoveryProtocolException(null, e);
}
- multicastRequestConstraints.checkProtocolVersion(pv);
- discoveryResponseExec.execute(
+ reggie.multicastRequestConstraints.checkProtocolVersion(pv);
+ reggie.discoveryResponseExec.execute(
new DecodeRequestTask(
dgram,
- getDiscovery(pv),
- RegistrarImpl.this
+ reggie.getDiscovery(pv),
+ reggie
)
);
@@ -2706,7 +2689,7 @@ class RegistrarImpl implements Registrar
} catch (InterruptedIOException e) {
break;
} catch (Exception e) {
- if (hasBeenInterrupted()) {
+ if (interrupted) {
break;
}
logger.log(Levels.HANDLED,
@@ -2718,9 +2701,8 @@ class RegistrarImpl implements Registrar
public void interrupt() {
// close socket to interrupt MulticastSocket.receive operation
- if (socket != null)
- socket.close();
- super.interrupt();
+ interrupted = true;
+ if (socket != null) socket.close();
}
/**
@@ -2730,8 +2712,8 @@ class RegistrarImpl implements Registrar
* interface.
*/
private void retryFailedInterfaces() {
- for (Iterator i = failedInterfaces.iterator(); i.hasNext(); ) {
- NetworkInterface nic = (NetworkInterface) i.next();
+ for (Iterator<NetworkInterface> i = failedInterfaces.iterator(); i.hasNext(); ) {
+ NetworkInterface nic = i.next();
try {
if (nic != null) {
socket.setNetworkInterface(nic);
@@ -2739,7 +2721,7 @@ class RegistrarImpl implements Registrar
socket.joinGroup(requestAddr);
i.remove();
- Level l = multicastInterfacesSpecified ?
+ Level l = reggie.multicastInterfacesSpecified ?
Level.INFO : Level.FINE;
if (logger.isLoggable(l)) {
if (nic != null) {
@@ -2756,40 +2738,42 @@ class RegistrarImpl implements Registrar
}
/** Unicast discovery request thread code. */
- private class UnicastThread extends InterruptedStatusThread {
+ private static class Unicast implements Runnable, Interruptable {
+ private final RegistrarImpl reggie;
/** Server socket to accepts connections on. */
private final ServerSocket listen;
/** Listen port */
public final int port;
+ private volatile boolean interrupted = false;
+
/**
* Create a daemon thread. Set up the socket now rather than in run,
* so that we get any exception up front.
*/
- public UnicastThread(int port) throws IOException {
- super("unicast request");
- setDaemon(true);
+ public Unicast(RegistrarImpl reggie, int port) throws IOException {
+ this.reggie = reggie;
ServerSocket listen = null;
if (port == 0) {
try {
- listen = serverSocketFactory.createServerSocket(Constants.discoveryPort);
+ listen = reggie.serverSocketFactory.createServerSocket(Constants.discoveryPort);
} catch (IOException e) {
logger.log(
Levels.HANDLED, "failed to bind to default port", e);
}
}
if (listen == null) {
- listen = serverSocketFactory.createServerSocket(port);
+ listen = reggie.serverSocketFactory.createServerSocket(port);
}
this.listen = listen;
this.port = listen.getLocalPort();
}
public void run() {
- while (!hasBeenInterrupted()) {
+ while (!interrupted) {
try {
Socket socket = listen.accept();
- if (hasBeenInterrupted()) {
+ if (interrupted) {
try {
socket.close();
} catch (IOException e) {
@@ -2798,7 +2782,7 @@ class RegistrarImpl implements Registrar
}
break;
}
- discoveryResponseExec.execute(new SocketTask(socket, RegistrarImpl.this));
+ reggie.discoveryResponseExec.execute(new SocketTask(socket, reggie));
} catch (InterruptedIOException e) {
break;
} catch (Exception e) {
@@ -2822,17 +2806,17 @@ class RegistrarImpl implements Registrar
*/
public void interrupt() {
try {
- Socket s = socketFactory.createSocket(InetAddress.getLocalHost(), port);
+ interrupted = true;
+ Socket s = reggie.socketFactory.createSocket(InetAddress.getLocalHost(), port);
s.close();
} catch (IOException e) {
}
- super.interrupt();
}
-
}
/** Multicast discovery announcement thread code. */
- private class AnnounceThread extends InterruptedStatusThread {
+ private static class Announce implements Runnable {
+ private final RegistrarImpl reggie;
/** Multicast socket to send packets on */
private final MulticastSocket socket;
/** Cached datagram packets */
@@ -2846,14 +2830,15 @@ class RegistrarImpl implements Registrar
* Create a daemon thread. Set up the socket now rather than in run,
* so that we get any exception up front.
*/
- public AnnounceThread() throws IOException {
- super("discovery announcement");
- setDaemon(true);
- if (multicastInterfaces == null || multicastInterfaces.length > 0)
+ public Announce(RegistrarImpl reggie) throws IOException {
+ this.reggie = reggie;
+// super("discovery announcement");
+// setDaemon(true);
+ if (reggie.multicastInterfaces == null || reggie.multicastInterfaces.length > 0)
{
socket = new MulticastSocket();
socket.setTimeToLive(
- multicastAnnouncementConstraints.getMulticastTimeToLive(
+ reggie.multicastAnnouncementConstraints.getMulticastTimeToLive(
DEFAULT_MULTICAST_TTL));
} else {
socket = null;
@@ -2861,17 +2846,17 @@ class RegistrarImpl implements Registrar
}
public synchronized void run() {
- if (multicastInterfaces != null && multicastInterfaces.length == 0)
+ if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0)
{
return;
}
try {
- while (!hasBeenInterrupted() && announce(memberGroups)) {
- wait(multicastAnnouncementInterval);
+ while (!Thread.currentThread().isInterrupted() && announce(reggie.memberGroups)) {
+ wait(reggie.multicastAnnouncementInterval);
}
} catch (InterruptedException e) {
}
- if (memberGroups.length > 0)
+ if (reggie.memberGroups.length > 0)
announce(new String[0]);//send NO_GROUPS just before shutdown
socket.close();
}
@@ -2882,26 +2867,27 @@ class RegistrarImpl implements Registrar
* synchronized run method in thread.
*/
private boolean announce(String[] groups) {
- if (dataPackets == null || !lastLocator.equals(myLocator) ||
+ if (dataPackets == null || !lastLocator.equals(reggie.myLocator) ||
!Arrays.equals(lastGroups, groups))
{
List packets = new ArrayList();
Discovery disco;
try {
- disco = getDiscovery(multicastAnnouncementConstraints
+ disco = reggie.getDiscovery(reggie.multicastAnnouncementConstraints
.chooseProtocolVersion());
} catch (DiscoveryProtocolException e) {
throw new AssertionError(e);
}
+ LookupLocator myLocator = reggie.myLocator;// Atomic
EncodeIterator ei = disco.encodeMulticastAnnouncement(
- new MulticastAnnouncement(announcementSeqNo++,
+ new MulticastAnnouncement(reggie.announcementSeqNo.getAndIncrement(),
myLocator.getHost(),
myLocator.getPort(),
groups,
- myServiceID),
- multicastAnnouncementConstraints
+ reggie.myServiceID),
+ reggie.multicastAnnouncementConstraints
.getMulticastMaxPacketSize(DEFAULT_MAX_PACKET_SIZE),
- multicastAnnouncementConstraints
+ reggie.multicastAnnouncementConstraints
.getUnfulfilledConstraints());
while (ei.hasNext()) {
try {
@@ -2937,11 +2923,12 @@ class RegistrarImpl implements Registrar
private void send(DatagramPacket[] packets)
throws InterruptedIOException
{
- if (multicastInterfaces != null) {
- Level failureLogLevel = multicastInterfacesSpecified ?
+ if (reggie.multicastInterfaces != null) {
+ Level failureLogLevel = reggie.multicastInterfacesSpecified ?
Level.WARNING : Levels.HANDLED;
- for (int i = 0; i < multicastInterfaces.length; i++) {
- send(packets, multicastInterfaces[i], failureLogLevel);
+ int l = reggie.multicastInterfaces.length;
+ for (int i = 0; i < l; i++) {
+ send(packets, reggie.multicastInterfaces[i], failureLogLevel);
}
} else {
send(packets, null, Level.WARNING);
@@ -3046,33 +3033,35 @@ class RegistrarImpl implements Registrar
* be locked while a reader mutex is locked, allows the snapshot to
* be treated as a reader process.
*/
- private class SnapshotThread extends InterruptedStatusThread {
-
+ private static class Snapshot implements Runnable {
+ RegistrarImpl reggie;
/** Create a daemon thread */
- public SnapshotThread() {
- super("snapshot thread");
- setDaemon(true);
+ public Snapshot(RegistrarImpl reggie) {
+ this.reggie = reggie;
+// super("snapshot thread");
+// setDaemon(true);
}
public void run() {
- if (log == null) {
+ if (reggie.log == null) {
return;
}
try {
- concurrentObj.readLock();
+ reggie.concurrentObj.readLock();
} catch (ConcurrentLockException e) {
return;
}
try {
- while (!hasBeenInterrupted()) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
- concurrentObj.readerWait(snapshotNotifier,
+ reggie.concurrentObj.readerWait(reggie.snapshotNotifier,
Long.MAX_VALUE);
try {
- log.snapshot();
- logFileSize = 0;
+ reggie.log.snapshot();
+ reggie.logFileSize.set(0);
} catch (Exception e) {
- if (hasBeenInterrupted())
+ // InterruptedException is never thrown in try
+ if (Thread.currentThread().isInterrupted())
return;
logger.log(Level.WARNING, "snapshot failed", e);
}
@@ -3081,7 +3070,7 @@ class RegistrarImpl implements Registrar
}
}
} finally {
- concurrentObj.readUnlock();
+ reggie.concurrentObj.readUnlock();
}
}
}
@@ -3242,7 +3231,6 @@ class RegistrarImpl implements Registrar
throw new SecurityException("privileged service id");
addAttributesDo(serviceID, leaseID, attrSets);
addLogRecord(new AttrsAddedLogObj(serviceID, leaseID, attrSets));
- queueEvents();
} finally {
concurrentObj.writeUnlock();
}
@@ -3263,7 +3251,6 @@ class RegistrarImpl implements Registrar
modifyAttributesDo(serviceID, leaseID, attrSetTmpls, attrSets);
addLogRecord(new AttrsModifiedLogObj(serviceID, leaseID,
attrSetTmpls, attrSets));
- queueEvents();
} finally {
concurrentObj.writeUnlock();
}
@@ -3282,7 +3269,6 @@ class RegistrarImpl implements Registrar
throw new SecurityException("privileged service id");
setAttributesDo(serviceID, leaseID, attrSets);
addLogRecord(new AttrsSetLogObj(serviceID, leaseID, attrSets));
- queueEvents();
} finally {
concurrentObj.writeUnlock();
}
@@ -3297,7 +3283,6 @@ class RegistrarImpl implements Registrar
try {
cancelServiceLeaseDo(serviceID, leaseID);
addLogRecord(new ServiceLeaseCancelledLogObj(serviceID, leaseID));
- queueEvents();
if (logger.isLoggable(Level.FINE)) {
logger.log(
Level.FINE,
@@ -3384,7 +3369,6 @@ class RegistrarImpl implements Registrar
try {
Exception[] exceptions = cancelLeasesDo(regIDs, leaseIDs);
addLogRecord(new LeasesCancelledLogObj(regIDs, leaseIDs));
- queueEvents();
if (logger.isLoggable(Level.FINE)) {
for (int i = 0; i < regIDs.length; i++) {
if (exceptions != null && exceptions[i] != null) {
@@ -3431,7 +3415,6 @@ class RegistrarImpl implements Registrar
joiner.addAttributes(attrSets);
lookupAttrs = joiner.getAttributes();
addLogRecord(new LookupAttributesChangedLogObj(lookupAttrs));
- queueEvents();
} catch (UnknownLeaseException e) {
throw new AssertionError("Self-registration never expires");
} finally {
@@ -3453,7 +3436,6 @@ class RegistrarImpl implements Registrar
joiner.modifyAttributes(attrSetTemplates, attrSets, true);
lookupAttrs = joiner.getAttributes();
addLogRecord(new LookupAttributesChangedLogObj(lookupAttrs));
- queueEvents();
} catch (UnknownLeaseException e) {
throw new AssertionError("Self-registration never expires");
} finally {
@@ -3733,28 +3715,32 @@ class RegistrarImpl implements Registrar
try {
if (port == unicastPort)
return;
- if ((port == 0 && unicaster.port == Constants.discoveryPort) ||
- port == unicaster.port)
+ if ((port == 0 && unicast.port == Constants.discoveryPort) ||
+ port == unicast.port)
{
unicastPort = port;
addLogRecord(new UnicastPortSetLogObj(port));
return;
}
/* create a UnicastThread that listens on the new port */
- UnicastThread newUnicaster = new UnicastThread(port);
+ unicast = new Unicast(this, port);
+ Thread newUnicaster = new InterruptedStatusThread( unicast , "unicast request");
+ newUnicaster.setDaemon(true);
/* terminate the current UnicastThread listening on the old port */
unicaster.interrupt();
try {
unicaster.join();
- } catch (InterruptedException e) { }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // Restore the interrupt.
+ }
/* start the UnicastThread listening on the new port */
unicaster = newUnicaster;
unicaster.start();
unicastPort = port;
myLocator = (proxy instanceof RemoteMethodControl) ?
new ConstrainableLookupLocator(
- myLocator.getHost(), unicaster.port, null) :
- new LookupLocator(myLocator.getHost(), unicaster.port);
+ myLocator.getHost(), unicast.port, null) :
+ new LookupLocator(myLocator.getHost(), unicast.port);
synchronized (announcer) {
announcer.notify();
}
@@ -3763,7 +3749,7 @@ class RegistrarImpl implements Registrar
logger.log(
Level.CONFIG,
"changed unicast discovery port to {0}",
- new Object[]{ Integer.valueOf(unicaster.port) });
+ new Object[]{ Integer.valueOf(unicast.port) });
}
} finally {
concurrentObj.writeUnlock();
@@ -4251,7 +4237,7 @@ class RegistrarImpl implements Registrar
*/
private void addService(SvcReg reg) {
serviceByID.put(reg.item.serviceID, reg);
- serviceByTime.put(reg, reg);
+ serviceByTime.add(reg);
addServiceByTypes(reg.item.serviceType, reg);
EntryRep[] entries = reg.item.attributeSets;
for (int i = entries.length; --i >= 0; ) {
@@ -4520,23 +4506,36 @@ class RegistrarImpl implements Registrar
/** Return an appropriate iterator for Items matching the Template. */
private ItemIter matchingItems(Template tmpl) {
if (tmpl.serviceID != null)
- return new IDItemIter(tmpl);
- if (!isEmpty(tmpl.serviceTypes))
- return new SvcIterator(tmpl);
+ return new IDItemIter(tmpl, serviceByID.get(tmpl.serviceID));
+ if (!isEmpty(tmpl.serviceTypes)){
+ Map<ServiceID,SvcReg> map = serviceByTypeName.get(
+ tmpl.serviceTypes[0].getName());
+ Iterator<SvcReg> services = map != null ? map.values().iterator() :
+ Collections.EMPTY_LIST.iterator();
+ return new SvcIterator(tmpl, services);
+ }
EntryRep[] sets = tmpl.attributeSetTemplates;
if (isEmpty(sets))
- return new AllItemIter();
+ return new AllItemIter(serviceByID.values().iterator());
for (int i = sets.length; --i >= 0; ) {
Object[] fields = sets[i].fields;
if (fields.length == 0) {
EntryClass eclass = getEmptyEntryClass(sets[i].eclass);
if (eclass != null)
- return new EmptyAttrItemIter(tmpl, eclass);
+ return new AttrItemIter(tmpl, serviceByEmptyAttr.get(eclass));
} else {
/* try subclass fields before superclass fields */
for (int j = fields.length; --j >= 0; ) {
- if (fields[j] != null)
- return new AttrItemIter(tmpl, i, j);
+ if (fields[j] != null){
+ EntryRep set = tmpl.attributeSetTemplates[i];
+ Map<Object,List<SvcReg>>[] attrMaps =
+ serviceByAttr.get(getDefiningClass(set.eclass,j));
+ List<SvcReg> svcs = null;
+ if (attrMaps != null && attrMaps[j] != null) {
+ svcs = attrMaps[j].get(set.fields[j]);
+ }
+ return new AttrItemIter(tmpl, svcs);
+ }
}
}
}
@@ -4988,6 +4987,16 @@ class RegistrarImpl implements Registrar
if (constructionException != null) throw constructionException;
concurrentObj.writeLock();
try {
+ if (log != null) {
+ inRecovery = true;
+ log.recover();
+ inRecovery = false;
+ }
+ // log snapshot recovers myServiceID
+ if (myServiceID == null) {
+ myServiceID = newServiceID();
+ }
+ computeMaxLeases();
// Make sure we're exporting with correct login context.
AccessController.doPrivileged(new PrivilegedExceptionAction<Object>(){
@@ -4996,8 +5005,8 @@ class RegistrarImpl implements Registrar
proxy = RegistrarProxy.getInstance(myRef, myServiceID);
myLocator = (proxy instanceof RemoteMethodControl) ?
new ConstrainableLookupLocator(
- unicastDiscoveryHost, unicaster.port, null) :
- new LookupLocator(unicastDiscoveryHost, unicaster.port);
+ unicastDiscoveryHost, unicast.port, null) :
+ new LookupLocator(unicastDiscoveryHost, unicast.port);
/* register myself */
Item item = new Item(new ServiceItem(myServiceID,
proxy,
@@ -5120,7 +5129,6 @@ class RegistrarImpl implements Registrar
addService(reg);
generateEvents(null, nitem, now);
addLogRecord(new SvcRegisteredLogObj(reg));
- queueEvents();
/* see if the expire thread needs to wake up earlier */
if (reg.leaseExpiration < minSvcExpiration) {
minSvcExpiration = reg.leaseExpiration;
@@ -5583,7 +5591,7 @@ class RegistrarImpl implements Registrar
/* force a re-sort: must remove before changing, then reinsert */
serviceByTime.remove(reg);
reg.leaseExpiration = renewExpiration;
- serviceByTime.put(reg, reg);
+ serviceByTime.add(reg);
/* see if the expire thread needs to wake up earlier */
if (renewExpiration < minSvcExpiration) {
minSvcExpiration = renewExpiration;
@@ -5605,7 +5613,7 @@ class RegistrarImpl implements Registrar
/* force a re-sort: must remove before changing, then reinsert */
serviceByTime.remove(reg);
reg.leaseExpiration = renewExpiration;
- serviceByTime.put(reg, reg);
+ serviceByTime.add(reg);
} finally {
concurrentObj.writeUnlock();
}
@@ -5842,17 +5850,6 @@ class RegistrarImpl implements Registrar
eventNotifierExec.execute(new EventTask(reg, sid, item, transition, proxy, this));
}
- /** Queue all pending EventTasks for processing by the task manager. */
- private void queueEvents() {
-// if (!newNotifies.isEmpty()) {
-// Iterator<EventTask> i = newNotifies.iterator();
-// while (i.hasNext()){
-// eventNotifierExec.execute(i.next());
-// }
-// newNotifies.clear();
-// }
- }
-
/** Generate a new service ID */
private ServiceID newServiceID() {
Uuid uuid = serviceIdGenerator.generate();
@@ -5902,7 +5899,7 @@ class RegistrarImpl implements Registrar
stream.writeInt(unicastPort);
stream.writeObject(memberGroups);
stream.writeObject(lookupGroups);
- stream.writeLong(announcementSeqNo);
+ stream.writeLong(announcementSeqNo.get());
marshalAttributes(lookupAttrs, stream);
marshalLocators(lookupLocators, stream);
for (Iterator iter = serviceByID.entrySet().iterator();
@@ -5969,7 +5966,7 @@ class RegistrarImpl implements Registrar
unicastPort = stream.readInt();
memberGroups = (String[])stream.readObject();
lookupGroups = (String[])stream.readObject();
- announcementSeqNo = stream.readLong() + Integer.MAX_VALUE;
+ announcementSeqNo.set( stream.readLong() + Integer.MAX_VALUE);
lookupAttrs = unmarshalAttributes(stream);
lookupLocators = prepareLocators(
unmarshalLocators(stream), recoveredLocatorPreparer, true);
@@ -6048,9 +6045,9 @@ class RegistrarImpl implements Registrar
logger.log(Level.FINER, "wrote log record {0}",
new Object[]{ rec });
}
- if (++logFileSize >= persistenceSnapshotThreshold) {
+ if (logFileSize.incrementAndGet() >= persistenceSnapshotThreshold) {
int snapshotSize = serviceByID.size() + eventByID.size();
- if (logFileSize >= persistenceSnapshotWeight * snapshotSize) {
+ if (logFileSize.get() >= persistenceSnapshotWeight * snapshotSize) {
concurrentObj.waiterNotify(snapshotNotifier);
}
}
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java?rev=1549810&r1=1549809&r2=1549810&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/InterruptedStatusThread.java Tue Dec 10 11:20:57 2013
@@ -26,14 +26,27 @@ package com.sun.jini.thread;
*/
public class InterruptedStatusThread extends Thread {
+ /**
+ * A Runnable task can implement this to be interrupted if something
+ * special needs to be done to cause the Runnable to notice it's been
+ * interrupted.
+ *
+ * @since 3.0.0
+ */
+ public interface Interruptable {
+ public void interrupt();
+ }
+
/** true if thread has been interrupted */
private boolean interrupted = false;
+ private final Interruptable task;
/**
* Constructs a new <code>InterruptedStatusThread</code> object.
*/
public InterruptedStatusThread() {
super();
+ task = null;
}
/**
@@ -42,6 +55,8 @@ public class InterruptedStatusThread ext
*/
public InterruptedStatusThread(Runnable target) {
super(target);
+ if (target instanceof Interruptable) task = (Interruptable) target;
+ else task = null;
}
/**
@@ -51,6 +66,8 @@ public class InterruptedStatusThread ext
*/
public InterruptedStatusThread(Runnable target, String name) {
super(target, name);
+ if (target instanceof Interruptable) task = (Interruptable) target;
+ else task = null;
}
/**
@@ -59,6 +76,7 @@ public class InterruptedStatusThread ext
*/
public InterruptedStatusThread(String name) {
super(name);
+ task = null;
}
/**
@@ -68,6 +86,8 @@ public class InterruptedStatusThread ext
*/
public InterruptedStatusThread(ThreadGroup group, Runnable target) {
super(group, target);
+ if (target instanceof Interruptable) task = (Interruptable) target;
+ else task = null;
}
/**
@@ -80,6 +100,8 @@ public class InterruptedStatusThread ext
Runnable target,
String name) {
super(group, target, name);
+ if (target instanceof Interruptable) task = (Interruptable) target;
+ else task = null;
}
/**
@@ -95,6 +117,8 @@ public class InterruptedStatusThread ext
String name,
long stackSize) {
super(group, target, name, stackSize);
+ if (target instanceof Interruptable) task = (Interruptable) target;
+ else task = null;
}
/**
@@ -104,11 +128,13 @@ public class InterruptedStatusThread ext
*/
public InterruptedStatusThread(ThreadGroup group, String name) {
super(group, name);
+ task = null;
}
// inherit javadoc
public synchronized void interrupt() {
interrupted = true;
+ if (task != null) task.interrupt();
super.interrupt();
}