You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2012/01/21 08:28:36 UTC
svn commit: r1234278 [17/29] - in /river/tck: ./ configs/ doc/ doc/api/
doc/api/com/ doc/api/com/sun/ doc/api/com/sun/jini/
doc/api/com/sun/jini/compat/ doc/api/com/sun/jini/compat/admin1/
doc/api/com/sun/jini/compat/admin2/ doc/api/com/sun/jini/compat...
Added: river/tck/src/com/sun/jini/compat/reggie/RegistrarImpl.java
URL: http://svn.apache.org/viewvc/river/tck/src/com/sun/jini/compat/reggie/RegistrarImpl.java?rev=1234278&view=auto
==============================================================================
--- river/tck/src/com/sun/jini/compat/reggie/RegistrarImpl.java (added)
+++ river/tck/src/com/sun/jini/compat/reggie/RegistrarImpl.java Sat Jan 21 07:28:27 2012
@@ -0,0 +1,4930 @@
+
+/*
+ *
+ * Copyright 2005 Sun Microsystems, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.sun.jini.compat.reggie;
+
+import java.rmi.RemoteException;
+import java.rmi.NoSuchObjectException;
+import java.rmi.MarshalledObject;
+import java.rmi.UnmarshalException;
+import java.rmi.activation.ActivationID;
+import java.rmi.activation.ActivationDesc;
+import java.rmi.activation.Activatable;
+import java.rmi.activation.ActivationException;
+import java.rmi.activation.ActivationSystem;
+import java.rmi.activation.ActivationGroup;
+import java.rmi.activation.ActivationGroupID;
+import java.rmi.server.RemoteObject;
+import java.rmi.server.UnicastRemoteObject;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.FileNotFoundException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.security.SecureRandom;
+import java.lang.reflect.Array;
+import com.sun.jini.constants.TimeConstants;
+import com.sun.jini.constants.ThrowableConstants;
+import com.sun.jini.start.SharedActivation;
+import com.sun.jini.thread.ReadersWriter;
+import com.sun.jini.thread.ReadersWriter.ConcurrentLockException;
+import com.sun.jini.thread.TaskManager;
+import com.sun.jini.lookup.entry.BasicServiceType;
+import net.jini.core.entry.Entry;
+import net.jini.core.lease.*;
+import net.jini.core.event.*;
+import net.jini.core.lookup.*;
+import net.jini.lookup.JoinManager;
+import net.jini.lookup.entry.ServiceInfo;
+import net.jini.core.discovery.LookupLocator;
+import net.jini.discovery.*;
+import com.sun.jini.reliableLog.ReliableLog;
+import com.sun.jini.reliableLog.LogException;
+import com.sun.jini.reliableLog.LogHandler;
+
+/**
+ * The server side of an implementation of the lookup service. Multiple
+ * client-side proxy classes are used, for the ServiceRegistrar interface
+ * as well as for leases and administration; their methods transform the
+ * parameters and then make corresponding calls on the Registrar interface
+ * implemented on the server side.
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ */
+public class RegistrarImpl implements Registrar {
+
+ /* ServiceInfo values */
+ private static final String PRODUCT = "Lookup";
+ private static final String MANUFACTURER = "Sun Microsystems, Inc.";
+ private static final String VENDOR = MANUFACTURER;
+ private static final String VERSION =
+ com.sun.jini.constants.VersionConstants.SERVER_VERSION;
+
+ /** Maximum minMax lease duration for both services and events */
+ private static final long MAX_LEASE = 1000L * 60 * 60 * 24 * 365 * 1000;
+ /** Maximum minimum renewal interval */
+ private static final long MAX_RENEW = 1000L * 60 * 60 * 24 * 365;
+ /** Log format version */
+ private static final int LOG_VERSION = 1;
+
+ /** Internet Protocol (IP) addresses of the network interfaces (NICs)
+ * through which multicast packets will be sent.
+ */
+ private InetAddress[] nicAddresses;
+
+ /** Empty attribute set */
+ private static final EntryRep[] emptyAttrs = {};
+
+ /** Proxy for myself */
+ private RegistrarProxy proxy;
+ /** Our service ID */
+ private ServiceID myServiceID;
+ /** Our activation id, if we're activatable */
+ private ActivationID activationID;
+ /** Reference to shared activation object. Used to modify the service's
+ * activation state under a shared activation group environment.
+ */
+ private SharedActivation sharedActivationRef = null;
+ /** Our LookupLocator */
+ private LookupLocator myLocator;
+
+ /**
+ * Map from ServiceID to SvcReg. Every service is in this map under
+ * its serviceID.
+ */
+ private final HashMap serviceByID = new HashMap();
+ /**
+ * Identity map from SvcReg to SvcReg, ordered by lease expiration.
+ * Every service is in this map.
+ */
+ private final TreeMap serviceByTime = new TreeMap();
+ /**
+ * Map from ServiceType to ArrayList(SvcReg). Every service is in
+ * this map under its exact serviceType.
+ */
+ private final HashMap serviceByType = new HashMap();
+ /**
+ * Map from EntryClass to HashMap[] where each HashMap is a map from
+ * Object (field value) to ArrayList(SvcReg). The HashMap array has as
+ * many elements as the EntryClass has fields (including fields defined
+ * by superclasses). Services are in this map multiple times, once
+ * for each field of each entry it has. The outer map is indexed by the
+ * first (highest) superclass that defines the field. This means that a
+ * HashMap[] has null elements for fields defined by superclasses, but
+ * this is a small memory hit and is simpler than subtracting off base
+ * index values when accessing the arrays.
+ */
+ private final HashMap serviceByAttr = new HashMap(23);
+ /**
+ * Map from EntryClass to ArrayList(SvcReg). Services are in this map
+ * multiple times, once for each no-fields entry it has (no fields meaning
+ * none of the superclasses have fields either). The map is indexed by
+ * the exact type of the entry.
+ */
+ private final HashMap serviceByEmptyAttr = new HashMap(11);
+ /** All EntryClasses with non-zero numInstances */
+ private final ArrayList entryClasses = new ArrayList();
+ /**
+ * Map from Long(eventID) to EventReg. Every event registration is in
+ * this map under its eventID.
+ */
+ private final HashMap eventByID = new HashMap(11);
+ /**
+ * Identity map from EventReg to EventReg, ordered by lease expiration.
+ * Every event registration is in this map.
+ */
+ private final TreeMap eventByTime = new TreeMap();
+ /**
+ * Map from ServiceID to EventReg or EventReg[]. An event
+ * registration is in this map if its template matches on (at least)
+ * a specific serviceID.
+ */
+ private final HashMap subEventByService = new HashMap(11);
+ /**
+ * Map from Long(eventID) to EventReg. An event registration is in
+ * this map if its template matches on ANY_SERVICE_ID.
+ */
+ private final HashMap subEventByID = new HashMap(11);
+
+ /** random number generator for UUID generation */
+ private final SecureRandom secRand = new SecureRandom();
+ /** 128-bit buffer for use with secRand */
+ private final byte[] secRandBuf16 = new byte[16];
+ /** 64-bit buffer for use with secRand */
+ private final byte[] secRandBuf8 = new byte[8];
+ /** Event id generator */
+ private long eventID = 0;
+ /** Random number generator for use in lookup */
+ private final Random random = new Random();
+
+ /** ArrayList of pending EventTasks */
+ private final ArrayList newNotifies = new ArrayList();
+
+ /** Current maximum service lease duration granted, in milliseconds. */
+ private long maxServiceLease;
+ /** Current maximum event lease duration granted, in milliseconds. */
+ private long maxEventLease;
+ /** Earliest expiration time of a SvcReg */
+ private long minSvcExpiration = Long.MAX_VALUE;
+ /** Earliest expiration time of an EventReg */
+ private long minEventExpiration = Long.MAX_VALUE;
+
+ /** Manager for discovering other lookup services */
+ private LookupDiscoveryManager discoer;
+ /** Manager for joining other lookup services */
+ private JoinManager joiner;
+ /** Task manager for sending events and discovery responses */
+ private final TaskManager tasker = new TaskManager(10, 1000 * 15, 1.0f);
+ /** Service lease expiration thread */
+ private final Thread serviceExpirer = new ServiceExpireThread();
+ /** Event lease expiration thread */
+ private final Thread eventExpirer = new EventExpireThread();
+ /** Unicast discovery request packet receiving thread */
+ private UnicastThread unicaster;
+ /** Multicast discovery request packet receiving thread */
+ private Thread multicaster;
+ /** Multicast discovery announcement sending thread */
+ private Thread announcer;
+ /** Snapshot-taking thread */
+ private final Thread snapshotter = new SnapshotThread();
+
+ /** Concurrent object to control read and write access */
+ private final ReadersWriter concurrentObj = new ReadersWriter();
+ /** Object for synchronizing with the service expire thread */
+ private final Object serviceNotifier = new Object();
+ /** Object for synchronizing with the event expire thread */
+ private final Object eventNotifier = new Object();
+ /** Object on which the snapshot-taking thread will synchronize */
+ private final Object snapshotNotifier = new Object();
+
+ /** Class resolver */
+ private final ClassResolver resolver = new ClassResolver();
+ /** Canonical ServiceType for java.lang.Object */
+ private ServiceType objectServiceType;
+
+ /** Reliable log object to hold Service and Event Records */
+ private ReliableLog log;
+ /** Flag indicating whether system is in a state of recovery */
+ private boolean inRecovery;
+ /** Current number of records in the Log File since the last snapshot */
+ private int logFileSize = 0;
+
+ /** Name of directory for persistence using ReliableLog */
+ private String logDirname;
+ /** Log File must contain this many records before snapshot allowed */
+ private int logToSnapshotThresh = 200;
+ /** Weight factor applied to snapshotSize when deciding to take snapshot */
+ private float snapshotWt = 10;
+ /** Minimum value for maxServiceLease. */
+ private long minMaxServiceLease = 1000 * 60 * 5;
+ /** Minimum value for maxEventLease. */
+ private long minMaxEventLease = 1000 * 60 * 30;
+ /** Minimum average time between lease renewals, in milliseconds. */
+ private long minRenewalInterval = 100;
+ /** Port for unicast discovery */
+ private int unicastPort = 0;
+ /** The groups we are a member of */
+ private String[] memberGroups = {};
+ /** The groups we should join */
+ private String[] lookupGroups = {};
+ /** The locators of other lookups we should join */
+ private LookupLocator[] lookupLocators = {};
+ /** The attributes to use when joining (including with myself) */
+ private Entry[] lookupAttrs;
+
+ /** Socket timeout for unicast discovery request processing */
+ private int unicastTimeout =
+ Integer.getInteger("com.sun.jini.compat.reggie.unicastTimeout",
+ 1000 * 60).intValue();
+
+ /**
+ * Called by the activation group.
+ *
+ * @param activationID our activation id
+ * @param data log directory name as a MarshalledObject
+ */
+ public RegistrarImpl(ActivationID activationID, MarshalledObject data)
+ throws IOException
+ {
+ this.activationID = activationID;
+ Activatable.exportObject(this, activationID, 0);
+ try {
+ logDirname = (String)data.get();
+ } catch (Exception e) {
+ throw new IllegalArgumentException
+ ("bad directory name in descriptor");
+ }
+ init();
+ }
+
+ /**
+ * Constructs an activatable, persistent instance of the Reggie
+ * implementation of the lookup service that runs in an activation
+ * group (VM) in which other activatable objects may also run.
+ *
+ * @param activationID our activation id
+ * @param data log directory name as a MarshalledObject
+ * @param sharedActivationRef reference to the shared activation object
+ */
+ public RegistrarImpl(ActivationID activationID,
+ MarshalledObject data,
+ SharedActivation sharedActivationRef)
+ throws IOException
+ {
+ this.activationID = activationID;
+ this.sharedActivationRef = sharedActivationRef;
+ Activatable.exportObject(this, activationID, 0);
+ try {
+ logDirname = (String)data.get();
+ } catch (Exception e) {
+ throw new IllegalArgumentException
+ ("bad directory name in descriptor");
+ }
+ init();
+ }
+
+ /**
+ * Called by ServiceStarter.createTransient for transient instances.
+ *
+ * @param logDirname log directory name
+ */
+ RegistrarImpl(String logDirname) throws IOException {
+ UnicastRemoteObject.exportObject(this);
+ this.logDirname = logDirname;
+ init();
+ }
+
+ /** A service item registration record. */
+ private final static class SvcReg implements Comparable, Serializable {
+
+ private static final long serialVersionUID = -1626838158255069853L;
+
+ /**
+ * The service item.
+ *
+ * @serial
+ */
+ public final Item item;
+ /**
+ * The lease id.
+ *
+ * @serial
+ */
+ public final long leaseID;
+ /**
+ * The lease expiration time.
+ *
+ * @serial
+ */
+ public long leaseExpiration;
+
+ /** Simple constructor */
+ public SvcReg(Item item, long leaseID, long leaseExpiration) {
+ this.item = item;
+ this.leaseID = leaseID;
+ this.leaseExpiration = leaseExpiration;
+ }
+
+ /**
+ * Primary sort by leaseExpiration, secondary by leaseID. The
+ * secondary sort is immaterial, except to ensure a total order
+ * (required by TreeMap).
+ */
+ public int compareTo(Object obj) {
+ SvcReg reg = (SvcReg)obj;
+ if (this == reg)
+ return 0;
+ if (leaseExpiration < reg.leaseExpiration ||
+ (leaseExpiration == reg.leaseExpiration &&
+ leaseID < reg.leaseID))
+ return -1;
+ return 1;
+ }
+ }
+
+ /** An event registration record. */
+ private final static class EventReg implements Comparable, Serializable {
+
+ private static final long serialVersionUID = -1549670962624946202L;
+
+ /**
+ * The event id.
+ * @serial
+ */
+ public final long eventID;
+ /**
+ * The lease id.
+ * @serial
+ */
+ public final long leaseID;
+ /**
+ * The template to match.
+ * @serial
+ */
+ public final Template tmpl;
+ /**
+ * The transitions.
+ *
+ * @serial
+ */
+ public final int transitions;
+ /**
+ * The current sequence number.
+ *
+ * @serial
+ */
+ public long seqNo;
+ /**
+ * The event listener.
+ */
+ public transient RemoteEventListener listener;
+ /**
+ * The handback object.
+ *
+ * @serial
+ */
+ public final MarshalledObject handback;
+ /**
+ * The lease expiration time.
+ *
+ * @serial
+ */
+ public long leaseExpiration;
+
+ /** Simple constructor */
+ public EventReg(long eventID, long leaseID, Template tmpl,
+ int transitions, RemoteEventListener listener,
+ MarshalledObject handback, long leaseExpiration) {
+ this.eventID = eventID;
+ this.leaseID = leaseID;
+ this.tmpl = tmpl;
+ this.transitions = transitions;
+ this.seqNo = 0;
+ this.listener = listener;
+ this.handback = handback;
+ this.leaseExpiration = leaseExpiration;
+ }
+
+ /**
+ * Primary sort by leaseExpiration, secondary by eventID. The
+ * secondary sort is immaterial, except to ensure a total order
+ * (required by TreeMap).
+ */
+ public int compareTo(Object obj) {
+ EventReg reg = (EventReg)obj;
+ if (this == reg)
+ return 0;
+ if (leaseExpiration < reg.leaseExpiration ||
+ (leaseExpiration == reg.leaseExpiration &&
+ eventID < reg.eventID))
+ return -1;
+ return 1;
+ }
+
+ /**
+ * @serialData RemoteEventListener as a MarshalledObject
+ */
+ private void writeObject(ObjectOutputStream stream)
+ throws IOException
+ {
+ stream.defaultWriteObject();
+ stream.writeObject(new MarshalledObject(listener));
+ }
+
+ /**
+ * Unmarshals the event listener.
+ */
+ private void readObject(ObjectInputStream stream)
+ throws IOException, ClassNotFoundException
+ {
+ stream.defaultReadObject();
+ MarshalledObject mo = (MarshalledObject)stream.readObject();
+ try {
+ listener = (RemoteEventListener)mo.get();
+ } catch (Throwable e) {
+ if (e instanceof Error &&
+ ThrowableConstants.retryable(e) ==
+ ThrowableConstants.BAD_OBJECT)
+ throw (Error)e;
+ }
+ }
+ }
+
+ /**
+ * Interface defining the method(s) that must be implemented by each of
+ * the concrete LogObj classes. This allows for the definition of
+ * object-dependent invocations of the appropriate implementation of
+ * the method(s) declared in this interface.
+ */
+ private static interface LogRecord extends Serializable {
+ void apply(RegistrarImpl regImpl);
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * a new service is registered.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class SvcRegisteredLogObj implements LogRecord {
+
+ private static final long serialVersionUID = 833681392248560066L;
+
+ /**
+ * The service registration.
+ *
+ * @serial
+ */
+ private SvcReg reg;
+
+ /** Simple constructor */
+ public SvcRegisteredLogObj(SvcReg reg) {
+ this.reg = reg;
+ }
+
+ /**
+ * Modifies the state of the Registrar by registering the service
+ * stored in the reg object. Also needs to delete any existing
+ * service with the same serviceID; this can happen if a service
+ * re-registers while an existing registration is in effect, because
+ * we don't log a separate lease cancellation record for the existing
+ * registration in that case.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.resolver.resolve(reg.item);
+ SvcReg oldReg =
+ (SvcReg)regImpl.serviceByID.get(reg.item.serviceID);
+ if (oldReg != null)
+ regImpl.deleteService(oldReg, 0);
+ regImpl.addService(reg);
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * new attributes are added to an existing service in the Registrar.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class AttrsAddedLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -5347182410011723905L;
+
+ /**
+ * The service id.
+ *
+ * @serial
+ */
+ private ServiceID serviceID;
+ /**
+ * The lease id.
+ *
+ * @serial
+ */
+ private long leaseID;
+ /**
+ * The attributes added.
+ *
+ * @serial
+ */
+ private EntryRep[] attrSets;
+
+ /** Simple constructor */
+ public AttrsAddedLogObj(ServiceID serviceID,
+ long leaseID,
+ EntryRep[] attrSets)
+ {
+ this.serviceID = serviceID;
+ this.leaseID = leaseID;
+ this.attrSets = attrSets;
+ }
+
+ /**
+ * Modifies the state of the Registrar by adding to all of the
+ * services matching the template, the attributes stored in
+ * attributeSets.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ try {
+ regImpl.addAttributesDo(serviceID, leaseID, attrSets);
+ } catch (UnknownLeaseException e) {
+ /* this exception should never occur when recovering */
+ }
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * existing attributes of an existing service in the Registrar are
+ * modified.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class AttrsModifiedLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -2773350506956661576L;
+
+ /**
+ * The service id.
+ *
+ * @serial
+ */
+ private ServiceID serviceID;
+ /**
+ * The lease id.
+ *
+ * @serial
+ */
+ private long leaseID;
+ /**
+ * The templates to match.
+ * @serial
+ */
+ private EntryRep[] attrSetTmpls;
+ /**
+ * The new attributes.
+ *
+ * @serial
+ */
+ private EntryRep[] attrSets;
+
+ /** Simple constructor */
+ public AttrsModifiedLogObj(ServiceID serviceID,
+ long leaseID,
+ EntryRep[] attrSetTmpls,
+ EntryRep[] attrSets)
+ {
+ this.serviceID = serviceID;
+ this.leaseID = leaseID;
+ this.attrSetTmpls = attrSetTmpls;
+ this.attrSets = attrSets;
+ }
+
+ /**
+ * Modifies the state of the Registrar by modifying the attributes
+ * of the services that match the template with the attributes
+ * stored in attributeSets.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ try {
+ regImpl.modifyAttributesDo(serviceID, leaseID,
+ attrSetTmpls, attrSets);
+ } catch (UnknownLeaseException e) {
+ /* this exception should never occur when recovering */
+ }
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * new attributes are set on an existing service in the Registrar.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class AttrsSetLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -395979667535255420L;
+
+ /**
+ * The service id.
+ *
+ * @serial
+ */
+ private ServiceID serviceID;
+ /**
+ * The lease id.
+ *
+ * @serial
+ */
+ private long leaseID;
+ /**
+ * The new attributes.
+ *
+ * @serial
+ */
+ private EntryRep[] attrSets;
+
+ /** Simple constructor */
+ public AttrsSetLogObj(ServiceID serviceID,
+ long leaseID,
+ EntryRep[] attrSets)
+ {
+ this.serviceID = serviceID;
+ this.leaseID = leaseID;
+ this.attrSets = attrSets;
+ }
+
+ /**
+ * Modifies the state of the Registrar by replacing the attributes
+ * of the services matching the template with the attributes stored
+ * in attributeSets.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ try {
+ regImpl.setAttributesDo(serviceID, leaseID, attrSets);
+ } catch (UnknownLeaseException e) {
+ /* this exception should never occur when recovering */
+ }
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * a new event is registered.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class EventRegisteredLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -807655888250060611L;
+
+ /**
+ * The event registration.
+ *
+ * @serial
+ */
+ private EventReg eventReg;
+
+ /** Simple constructor */
+ public EventRegisteredLogObj(EventReg eventReg) {
+ this.eventReg = eventReg;
+ }
+
+ /**
+ * Modifies the state of the Registrar by registering the event
+ * stored in the eventReg object; and by updating both the event
+ * sequence number and the event ID.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.resolver.resolve(eventReg.tmpl);
+ eventReg.seqNo += Integer.MAX_VALUE;
+ regImpl.addEvent(eventReg);
+ regImpl.eventID++;
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * a lease on an existing service in the Registrar is cancelled.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class ServiceLeaseCancelledLogObj implements LogRecord {
+
+ private static final long serialVersionUID = 8363406735506378972L;
+
+ /**
+ * The service id.
+ *
+ * @serial
+ */
+ private ServiceID serviceID;
+ /**
+ * The lease id.
+ *
+ * @serial
+ */
+ private long leaseID;
+
+ /** Simple constructor */
+ public ServiceLeaseCancelledLogObj(ServiceID serviceID, long leaseID) {
+ this.serviceID = serviceID;
+ this.leaseID = leaseID;
+ }
+
+ /**
+ * Modifies the state of the Registrar by cancelling the lease
+ * having ID equal to the contents of the leaseID field; and
+ * corresponding to the service with ID equal to the contents of
+ * the serviceID field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ try {
+ regImpl.cancelServiceLeaseDo(serviceID, leaseID);
+ } catch (UnknownLeaseException e) {
+ /* this exception should never occur when recovering */
+ }
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * a lease on an existing service in the Registrar is renewed.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class ServiceLeaseRenewedLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -6941618092365889931L;
+
+ /**
+ * The service id.
+ *
+ * @serial
+ */
+ private ServiceID serviceID;
+ /**
+ * The lease id.
+ *
+ * @serial
+ */
+ private long leaseID;
+ /**
+ * The new lease expiration time.
+ *
+ * @serial
+ */
+ private long leaseExpTime;
+
+ /** Simple constructor */
+ public ServiceLeaseRenewedLogObj(ServiceID serviceID,
+ long leaseID,
+ long leaseExpTime)
+ {
+ this.serviceID = serviceID;
+ this.leaseID = leaseID;
+ this.leaseExpTime = leaseExpTime;
+ }
+
+ /**
+ * Modifies the state of the Registrar by renewing the lease
+ * having ID equal to the contents of the leaseID field; and
+ * corresponding to the service with ID equal to the contents
+ * of the serviceID field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.renewServiceLeaseAbs(serviceID, leaseID, leaseExpTime);
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * a lease on a registered event is cancelled.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class EventLeaseCancelledLogObj implements LogRecord {
+
+ private static final long serialVersionUID = 723479933309720973L;
+
+ /**
+ * The event id.
+ *
+ * @serial
+ */
+ private long eventID;
+ /**
+ * The lease id.
+ *
+ * @serial
+ */
+ private long leaseID;
+
+ /** Simple constructor */
+ public EventLeaseCancelledLogObj(long eventID, long leaseID) {
+ this.eventID = eventID;
+ this.leaseID = leaseID;
+ }
+
+ /**
+ * Modifies the state of the Registrar by cancelling the lease
+ * corresponding to the event with ID equal to the contents of
+ * the eventID field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ try {
+ regImpl.cancelEventLeaseDo(eventID, leaseID);
+ } catch (UnknownLeaseException e) {
+ /* this exception should never occur when recovering */
+ }
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * a lease on a registered event is renewed.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class EventLeaseRenewedLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -2313983070714702699L;
+
+ /**
+ * The event id.
+ *
+ * @serial
+ */
+ private long eventID;
+ /**
+ * The lease id.
+ *
+ * @serial
+ */
+ private long leaseID;
+ /**
+ * The new lease expiration time.
+ *
+ * @serial
+ */
+ private long leaseExpTime;
+
+ /** Simple constructor */
+ public EventLeaseRenewedLogObj(long eventID,
+ long leaseID,
+ long leaseExpTime)
+ {
+ this.eventID = eventID;
+ this.leaseID = leaseID;
+ this.leaseExpTime = leaseExpTime;
+ }
+
+ /**
+ * Modifies the state of the Registrar by renewing the lease
+ * corresponding to the event with ID equal to the contents of
+ * the eventID field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.renewEventLeaseAbs(eventID, leaseID, leaseExpTime);
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * a leases in the Registrar is renewed via a LeaseMap.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class LeasesRenewedLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -2796802215410373050L;
+
+ /**
+ * The service and event ids.
+ *
+ * @serial
+ */
+ private Object[] regIDs;
+ /**
+ * The lease ids.
+ *
+ * @serial
+ */
+ private long[] leaseIDs;
+ /**
+ * The new lease expiration times.
+ *
+ * @serial
+ */
+ private long[] leaseExpTimes;
+
+ /** Simple constructor */
+ public LeasesRenewedLogObj(Object[] regIDs,
+ long[] leaseIDs,
+ long[] leaseExpTimes)
+ {
+ this.regIDs = regIDs;
+ this.leaseIDs = leaseIDs;
+ this.leaseExpTimes = leaseExpTimes;
+ }
+
+ /**
+ * Modifies the state of the Registrar by renewing the specified
+ * leases.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.renewLeasesAbs(regIDs, leaseIDs, leaseExpTimes);
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * lease are cancelled via a LeaseMap.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class LeasesCancelledLogObj implements LogRecord {
+
+ private static final long serialVersionUID = 7462419701214242565L;
+
+ /**
+ * The service and event ids.
+ *
+ * @serial
+ */
+ private Object[] regIDs;
+ /**
+ * The lease ids.
+ *
+ * @serial
+ */
+ private long[] leaseIDs;
+
+ /** Simple constructor */
+ public LeasesCancelledLogObj(Object[] regIDs, long[] leaseIDs) {
+ this.regIDs = regIDs;
+ this.leaseIDs = leaseIDs;
+ }
+
+ /**
+ * Modifies the state of the Registrar by cancelling the specified
+ * leases.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ /* Exceptions can be returned, since we didn't weed out unknown
+ * leases before logging, but we can just ignore them anyway.
+ */
+ regImpl.cancelLeasesDo(regIDs, leaseIDs);
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * the Unicast Port Number is set to a new value.
+ * <p>
+ * Note: the apply() method of this class merely sets the private field
+ * unicastPort. This means that during a recovery, the unicaster
+ * thread will be created with this new port number ONLY IF that
+ * thread is created AFTER recovery is complete. Thus, it is
+ * important that at re-initialization during a re-activation
+ * of the Registrar, the recovery() method is invoked before
+ * the unicaster thread is created.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class UnicastPortSetLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -5923066193628958723L;
+
+ /**
+ * The new port number.
+ *
+ * @serial
+ */
+ private int newPort;
+
+ /** Simple constructor */
+ public UnicastPortSetLogObj(int newPort) {
+ this.newPort = newPort;
+ }
+
+ /**
+ * Modifies the state of the Registrar by setting the value of the
+ * private unicastPort field to the value of the newPort field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.unicastPort = newPort;
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * the Minimum Maximum Service Lease Duration is set to a new value.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class MinMaxServiceLeaseSetLogObj implements LogRecord {
+
+ private static final long serialVersionUID = 3205940543108355772L;
+
+ /**
+ * The new minimum maximum service lease duration.
+ *
+ * @serial
+ */
+ private long newLeaseDuration;
+
+ /** Simple constructor */
+ public MinMaxServiceLeaseSetLogObj(long newLeaseDuration) {
+ this.newLeaseDuration = newLeaseDuration;
+ }
+
+ /**
+ * Modifies the state of the Registrar by setting the value of
+ * the private minMaxServiceLease field to the value of the
+ * newLeaseDuration field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.minMaxServiceLease = newLeaseDuration;
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * the Minimum Maximum Event Lease Duration is set to a new value.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class MinMaxEventLeaseSetLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -6425690586422726211L;
+
+ /**
+ * The new minimum maximum event lease duration.
+ *
+ * @serial
+ */
+ private long newLeaseDuration;
+
+ /** Simple constructor */
+ public MinMaxEventLeaseSetLogObj(long newLeaseDuration) {
+ this.newLeaseDuration = newLeaseDuration;
+ }
+
+ /**
+ * Modifies the state of the Registrar by setting the value of
+ * the private minMaxEventLease field to the value of the
+ * newLeaseDuration field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.minMaxEventLease = newLeaseDuration;
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * the Minimum Renewal Interval is set to a new value.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class MinRenewalIntervalSetLogObj implements LogRecord {
+
+ private static final long serialVersionUID = 5812923613520666861L;
+
+ /**
+ * The new minimum renewal interval.
+ *
+ * @serial
+ */
+ private long newRenewalInterval;
+
+ /** Simple constructor */
+ public MinRenewalIntervalSetLogObj(long newRenewalInterval) {
+ this.newRenewalInterval = newRenewalInterval;
+ }
+
+ /**
+ * Modifies the state of the Registrar by setting the value of
+ * the private minRenewalInterval field to the value of the
+ * newRenewalInterval.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.minRenewalInterval = newRenewalInterval;
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * the Weight Factor applied to the size of the Snapshot File in the
+ * "snapshot determination" expression is set to a new value.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class SnapshotWeightSetLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -7068462033891338457L;
+
+ /**
+ * The new snapshot weight factor.
+ *
+ * @serial
+ */
+ private float newWeight;
+
+ /** Simple constructor */
+ public SnapshotWeightSetLogObj(float newWeight) {
+ this.newWeight = newWeight;
+ }
+
+ /**
+ * Modifies the state of the Registrar by setting the value of the
+ * private snapshotWt field to the value of the newWeight field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.snapshotWt = newWeight;
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * the set of groups to join is changed.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class LookupGroupsChangedLogObj implements LogRecord {
+
+ private static final long serialVersionUID = -5164130449456011085L;
+
+ /**
+ * The new groups to join.
+ *
+ * @serial
+ */
+ private String[] groups;
+
+ /** Simple constructor */
+ public LookupGroupsChangedLogObj(String[] groups) {
+ this.groups = groups;
+ }
+
+ /**
+ * Modifies the state of the Registrar by setting the private
+ * field lookupGroups to the reference to the groups field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.lookupGroups = groups;
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * the set of locators of lookup services to join is changed.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class LookupLocatorsChangedLogObj implements LogRecord {
+
+ private static final long serialVersionUID = 7707774807971026109L;
+
+ /**
+ * The new locators to join.
+ *
+ * @serial
+ */
+ private LookupLocator[] locators;
+
+ /** Simple constructor */
+ public LookupLocatorsChangedLogObj(LookupLocator[] locators) {
+ this.locators = locators;
+ }
+
+ /**
+ * Modifies the state of the Registrar by setting the private
+ * field lookupLocators to the reference to the locators field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.lookupLocators = locators;
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * the Threshold employed in the "snapshot determination" expression
+ * is set to a new value.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class LogToSnapshotThresholdSetLogObj implements LogRecord {
+
+ private static final long serialVersionUID = 7141538629320197819L;
+
+ /**
+ * The new snapshot threshold.
+ *
+ * @serial
+ */
+ private int newThreshold;
+
+ /** Simple constructor */
+ public LogToSnapshotThresholdSetLogObj(int newThreshold) {
+ this.newThreshold = newThreshold;
+ }
+
+ /**
+ * Modifies the state of the Registrar by setting the value
+ * of the private logToSnapshotThresh field to the value of
+ * the newThreshold field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.logToSnapshotThresh = newThreshold;
+ }
+ }
+
+ /**
+ * LogObj class whose instances are recorded to the log file whenever
+ * the memberGroups array is set to reference a new array of strings.
+ *
+ * @see RegistrarImpl.LocalLogHandler
+ */
+ private static class MemberGroupsChangedLogObj implements LogRecord {
+
+ private static final long serialVersionUID = 4764341386996269738L;
+
+ /**
+ * The new groups to be a member of.
+ *
+ * @serial
+ */
+ private String[] groups;
+
+ /** Simple constructor */
+ public MemberGroupsChangedLogObj(String[] groups) {
+ this.groups = groups;
+ }
+
+ /**
+ * Modifies the state of the Registrar by setting the private
+ * memberGroups field to the reference to the groups field.
+ *
+ * @see RegistrarImpl.LocalLogHandler#applyUpdate
+ */
+ public void apply(RegistrarImpl regImpl) {
+ regImpl.memberGroups = groups;
+ }
+ }
+
+ /**
+ * Handler class for the persistent storage facility.
+ * <p>
+ * At any point during processing in the Registrar, there will exist
+ * both a 'snapshot' of the Registrar's state and a set of records
+ * detailing each significant change that has occurred to the state
+ * since the snapshot was taken. The snapshot information and the
+ * incremental change information will be stored in separate files
+ * called, respectively, the snapshot file and the log file. Together,
+ * these files are used to recover the state of the Registrar after a
+ * crash or a network outage (or if the Registrar or its ActivationGroup
+ * is un-registered and then re-registered through the Activation Daemon).
+ * <p>
+ * This class contains the methods that are used to record and recover
+ * the snapshot of the Registrar's state; as well as the method used to
+ * apply the state changes that were recorded in the log file.
+ * <p>
+ * When the ReliableLog class is instantiated, a new instance of this
+ * class is passed to its constructor so that the methods of this
+ * class may be invoked by the methods defined in the ReliableLog.
+ * Because this class extends the LogHandler class associated with
+ * the ReliableLog class, this class must provide implementations of
+ * the abstract methods declared in the LogHandler. Also, some of the
+ * methods defined in this class override the methods of the LogHandler
+ * in order to customize the handling of snapshot creation and
+ * retrieval.
+ * <p>
+ * Each significant change to the Registrar's state is written to the
+ * log file as an individual record (when addLogRecord() is invoked).
+ * After the number of records logged exceeds a pre-defined threshold,
+ * a snapshot of the state is recorded by invoking -- through the
+ * ReliableLog and its LogHandler -- the snapshot() method defined in
+ * this class. After the snapshot is taken, the log file is cleared
+ * and the incremental log process starts over.
+ * <p>
+ * The contents of the snapshot file reflect the DATA contained in
+ * the fields making up the current state of the Registrar. That data
+ * represents many changes -- over time -- to the Registrar's state.
+ * On the other hand, each record written to the log file is an object
+ * that reflects both the data used and the ACTIONS taken to make one
+ * change to the Registrar's state at a particular point in time.
+ * <p>
+ * The data written to the snapshot file is shown below:
+ * <ul>
+ * <li> our service ID
+ * <li> current lease ID
+ * <li> current event ID
+ * <li> current configuration parameters
+ * <li> contents of the container holding the current registered services
+ * <li> null (termination 'marker' for the set of registered services)
+ * <li> contents of the container holding the current registered events
+ * <li> null (termination 'marker' for the set of registered events)
+ * </ul>
+ * The type of state changes that will generate a new record in the log
+ * file are:
+ * <ul>
+ * <li> a new service was registered
+ * <li> a new event was registered
+ * <li> new attributes were added to an existing service
+ * <li> existing attributes of a service were modified
+ * <li> a service lease was cancelled
+ * <li> a service lease was renewed
+ * <li> an event lease was cancelled
+ * <li> an event lease was renewed
+ * <li> a configuration parameter was changed
+ * </ul>
+ * During recovery, the state of the Registrar at the time of a crash
+ * or outage is re-constructed by first retrieving the 'base' state from
+ * the snapshot file; and then modifying that base state according to
+ * the records retrieved from the log file. The reconstruction of the
+ * base state is achieved by invoking the recover() method defined in
+ * this class. The modifications recorded in the log file are then
+ * applied to the base state by invoking the applyUpdate() method
+ * defined in this class. Both recover() and applyUpdate() are invoked
+ * through the ReliableLog and its associated LogHandler.
+ * <p>
+ * NOTE: The following lines must be added to the Registrar's policy file
+ * <pre>
+ * permission java.io.FilePermission "dirname", "read,write,delete";
+ * permission java.io.FilePermission "dirname/-", "read,write,delete";
+ * </pre>
+ * where 'dirname' is the name of the directory path (relative or
+ * absolute) where the snapshot and log file will be maintained.
+ */
+ private class LocalLogHandler extends LogHandler {
+
+ /** Simple constructor */
+ public LocalLogHandler() { }
+
+ /* Overrides snapshot() defined in ReliableLog's LogHandler class. */
+ public void snapshot(OutputStream out) throws IOException {
+ takeSnapshot(out);
+ }
+
+ /* Overrides recover() defined in ReliableLog's LogHandler class. */
+ public void recover(InputStream in)
+ throws IOException, ClassNotFoundException
+ {
+ recoverSnapshot(in);
+ }
+
+ /**
+ * Required method implementing the abstract applyUpdate()
+ * defined in ReliableLog's associated LogHandler class.
+ * <p>
+ * During state recovery, the recover() method defined in the
+ * ReliableLog class is invoked. That method invokes the method
+ * recoverUpdates() which invokes the method readUpdates(). Both
+ * of those methods are defined in ReliableLog. The method
+ * readUpdates() retrieves a record from the log file and then
+ * invokes this method.
+ * <p>
+ * This method invokes the version of the method apply() that
+ * corresponds to the particular type of 'log record' object
+ * that is input as the first argument. The log record object and its
+ * corresponding apply() method are defined in one of the so-called
+ * LogObj classes. Any instance of one the LogObj classes is an
+ * implementation of the LogRecord interface. The particular
+ * implementation that is input to this method is dependent on the
+ * type of record that was originally logged. The apply() method
+ * will then modify the state of the Registrar in a way dictated
+ * by the type of record that was retrieved.
+ */
+ public void applyUpdate(Object logRecObj) {
+ ((LogRecord)logRecObj).apply(RegistrarImpl.this);
+ }
+ }
+
+ /** Base class for iterating over all Items that match a Template. */
+ private abstract class ItemIter {
+ /** Current time */
+ public final long now = System.currentTimeMillis();
+ /** True means duplicate items are possible */
+ public boolean dupsPossible = false;
+ /** Template to match */
+ protected final Template tmpl;
+ /** Next item to return */
+ protected SvcReg reg;
+
+ /** Subclass constructors must initialize reg */
+ protected ItemIter(Template tmpl) {
+ this.tmpl = tmpl;
+ }
+
+ /** Returns true if the iteration has more elements. */
+ public boolean hasNext() {
+ return reg != null;
+ }
+
+ /** Returns the next element in the iteration as an Item. */
+ public Item next() {
+ if (reg == null)
+ throw new NoSuchElementException();
+ Item item = reg.item;
+ step();
+ return item;
+ }
+
+ /** Returns the next element in the iteration as a SvcReg. */
+ public SvcReg nextReg() {
+ if (reg == null)
+ throw new NoSuchElementException();
+ SvcReg cur = reg;
+ step();
+ return cur;
+ }
+
+ /** Set reg to the next matching element, or null if none */
+ protected abstract void step();
+ }
+
+ /** Iterate over all Items. */
+ private class AllItemIter extends ItemIter {
+ /** Iterator over serviceByID */
+ private final Iterator iter;
+
+ /** Assumes the empty template */
+ public AllItemIter() {
+ super(null);
+ iter = serviceByID.values().iterator();
+ step();
+ }
+
+ /** Set reg to the next matching element, or null if none */
+ protected void step() {
+ while (iter.hasNext()) {
+ reg = (SvcReg)iter.next();
+ if (reg.leaseExpiration > now)
+ return;
+ }
+ reg = null;
+ }
+ }
+
+ /** Iterate over all matching Items by ServiceType. */
+ private class TypeItemIter extends ItemIter {
+ /** Concrete classes matching tmpl.serviceTypes */
+ private final ServiceType[] types;
+ /** Current index into types */
+ private int typeidx;
+ /** SvcRegs obtained from serviceByType for current service type */
+ private ArrayList svcs;
+ /** Current index into svcs */
+ private int svcidx = 0;
+
+ /**
+ * tmp.serviceID == null and
+ * tmpl.serviceTypes is non-empty
+ */
+ public TypeItemIter(Template tmpl) {
+ super(tmpl);
+ types = matchingConcreteClasses(tmpl.serviceTypes);
+ typeidx = types.length;
+ step();
+ }
+
+ /** Set reg to the next matching element, or null if none */
+ protected void step() {
+ do {
+ while (--svcidx >= 0) {
+ reg = (SvcReg)svcs.get(svcidx);
+ if (reg.leaseExpiration > now &&
+ matchAttributes(tmpl, reg.item))
+ return;
+ }
+ } while (stepType());
+ reg = null;
+ }
+
+ /**
+ * Step to the next type in types, if any, reset svcs and svcidx,
+ * and return false if types exhausted.
+ */
+ private boolean stepType() {
+ if (--typeidx < 0)
+ return false;
+ svcs = (ArrayList)serviceByType.get(types[typeidx]);
+ svcidx = svcs.size();
+ return true;
+ }
+ }
+
+ /** Iterate over all matching Items by attribute value. */
+ private class AttrItemIter extends ItemIter {
+ /** SvcRegs obtained from serviceByAttr for chosen attr */
+ protected ArrayList svcs;
+ /** Current index into svcs */
+ protected int svcidx;
+
+ /**
+ * tmpl.serviceID == null and
+ * tmpl.serviceTypes is empty and
+ * tmpl.attributeSetTemplates[setidx].fields[fldidx] != null
+ */
+ public AttrItemIter(Template tmpl, int setidx, int fldidx) {
+ super(tmpl);
+ EntryRep set = tmpl.attributeSetTemplates[setidx];
+ HashMap[] attrMaps =
+ (HashMap[])serviceByAttr.get(getDefiningClass(set.eclass,
+ fldidx));
+ if (attrMaps != null && attrMaps[fldidx] != null) {
+ svcs = (ArrayList)attrMaps[fldidx].get(set.fields[fldidx]);
+ if (svcs != null) {
+ svcidx = svcs.size();
+ step();
+ }
+ }
+ }
+
+ /** Simple constructor */
+ protected AttrItemIter(Template tmpl) {
+ super(tmpl);
+ }
+
+ /** Set reg to the next matching element, or null if none. */
+ protected void step() {
+ while (--svcidx >= 0) {
+ reg = (SvcReg)svcs.get(svcidx);
+ if (reg.leaseExpiration > now &&
+ matchAttributes(tmpl, reg.item))
+ return;
+ }
+ reg = null;
+ }
+ }
+
+ /** Iterate over all matching Items by no-fields entry class. */
+ private class EmptyAttrItemIter extends AttrItemIter {
+
+ /**
+ * tmpl.serviceID == null and
+ * tmpl.serviceTypes is empty and
+ * eclass has no fields
+ */
+ public EmptyAttrItemIter(Template tmpl, EntryClass eclass) {
+ super(tmpl);
+ svcs = (ArrayList)serviceByEmptyAttr.get(eclass);
+ if (svcs != null) {
+ svcidx = svcs.size();
+ step();
+ }
+ }
+ }
+
+ /** Iterate over all matching Items by entry class, dups possible. */
+ private class ClassItemIter extends ItemIter {
+ /** Entry class to match on */
+ private final EntryClass eclass;
+ /** Current index into entryClasses */
+ private int classidx;
+ /** Values iterator for current HashMap */
+ private Iterator iter;
+ /** SvcRegs obtained from iter or serviceByEmptyAttr */
+ private ArrayList svcs;
+ /** Current index into svcs */
+ private int svcidx = 0;
+
+ /**
+ * tmpl.serviceID == null and
+ * tmpl.serviceTypes is empty and
+ * tmpl.attributeSetTemplates is non-empty
+ */
+ public ClassItemIter(Template tmpl) {
+ super(tmpl);
+ dupsPossible = true;
+ eclass = tmpl.attributeSetTemplates[0].eclass;
+ classidx = entryClasses.size();
+ step();
+ }
+
+ /** Set reg to the next matching element, or null if none */
+ protected void step() {
+ do {
+ while (--svcidx >= 0) {
+ reg = (SvcReg)svcs.get(svcidx);
+ if (reg.leaseExpiration > now &&
+ matchAttributes(tmpl, reg.item))
+ return;
+ }
+ } while (stepValue());
+ reg = null;
+ }
+
+ /**
+ * Step to the next HashMap value, if any, reset svcs and svcidx,
+ * and return false if everything exhausted.
+ */
+ private boolean stepValue() {
+ while (true) {
+ if (iter != null && iter.hasNext()) {
+ svcs = (ArrayList)iter.next();
+ svcidx = svcs.size();
+ return true;
+ }
+ if (!stepClass())
+ return false;
+ if (iter == null)
+ return true;
+ }
+ }
+
+ /**
+ * Step to the next matching entry class, if any, reset iter
+ * using the HashMap for the last field of the class (and reset
+ * (svcs and svcidx if the entry class has no fields), and
+ * return false if everything exhausted.
+ */
+ private boolean stepClass() {
+ while (--classidx >= 0) {
+ EntryClass cand = (EntryClass)entryClasses.get(classidx);
+ if (!eclass.isAssignableFrom(cand))
+ continue;
+ if (cand.getNumFields() > 0) {
+ cand = getDefiningClass(cand, cand.getNumFields() - 1);
+ HashMap[] attrMaps = (HashMap[])serviceByAttr.get(cand);
+ iter = attrMaps[attrMaps.length - 1].values().iterator();
+ } else {
+ iter = null;
+ svcs = (ArrayList)serviceByEmptyAttr.get(cand);
+ svcidx = svcs.size();
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /** Iterate over a singleton matching Item by serviceID. */
+ private class IDItemIter extends ItemIter {
+
+ /** tmpl.serviceID != null */
+ public IDItemIter(Template tmpl) {
+ super(tmpl);
+ reg = (SvcReg)serviceByID.get(tmpl.serviceID);
+ if (reg != null &&
+ (reg.leaseExpiration <= now || !matchItem(tmpl, reg.item)))
+ reg = null;
+ }
+
+ /** Set reg to null */
+ protected void step() {
+ reg = null;
+ }
+ }
+
+ /** An event to be sent, and the listener to send it to. */
+ private final class EventTask implements TaskManager.Task {
+
+ /** The event registration */
+ public final EventReg reg;
+ /** The sequence number of this event */
+ public final long seqNo;
+ /** The service id */
+ public final ServiceID sid;
+ /** The new state of the item, or null if deleted */
+ public final Item item;
+ /** The transition that fired */
+ public final int transition;
+
+ /** Simple constructor, except increments reg.seqNo. */
+ public EventTask(EventReg reg,
+ ServiceID sid,
+ Item item,
+ int transition)
+ {
+ this.reg = reg;
+ seqNo = ++reg.seqNo;
+ this.sid = sid;
+ this.item = item;
+ this.transition = transition;
+ }
+
+ /** Send the event */
+ public void run() {
+ try {
+ reg.listener.notify(new RegistrarEvent(proxy, reg.eventID,
+ seqNo, reg.handback,
+ sid, transition, item));
+ } catch (Throwable e) {
+ switch (ThrowableConstants.retryable(e)) {
+ case ThrowableConstants.BAD_OBJECT:
+ if (e instanceof Error)
+ throw (Error)e;
+ case ThrowableConstants.BAD_INVOCATION:
+ case ThrowableConstants.UNCATEGORIZED:
+ /* If the listener throws UnknownEvent or some other
+ * definite exception, we can cancel the lease.
+ */
+ try {
+ cancelEventLease(reg.eventID, reg.leaseID);
+ } catch (UnknownLeaseException ee) {
+ }
+ }
+ }
+ }
+
+ /** Keep events going to the same listener ordered. */
+ public boolean runAfter(List tasks, int size) {
+ for (int i = size; --i >= 0; ) {
+ Object obj = tasks.get(i);
+ if (obj instanceof EventTask &&
+ reg.listener.equals(((EventTask)obj).reg.listener))
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /** Address for unicast discovery response. */
+ private final class AddressTask implements TaskManager.Task {
+
+ /** The address */
+ public final InetAddress addr;
+ /** The port */
+ public final int port;
+
+ /** Simple constructor */
+ public AddressTask(InetAddress addr, int port) {
+ this.addr = addr;
+ this.port = port;
+ }
+
+ public int hashCode() {
+ return addr.hashCode();
+ }
+
+ /** Two tasks are equal if they have the same address and port */
+ public boolean equals(Object obj) {
+ if (!(obj instanceof AddressTask))
+ return false;
+ AddressTask ua = (AddressTask)obj;
+ return addr.equals(ua.addr) && port == ua.port;
+ }
+
+ /** Connect and then process a unicast discovery request */
+ public void run() {
+ try {
+ respond(new Socket(addr, port));
+ } catch (IOException e) {
+ } catch (SecurityException e) {
+ }
+ }
+
+ /** No ordering */
+ public boolean runAfter(List tasks, int size) {
+ return false;
+ }
+ }
+
+ /** Socket for unicast discovery response. */
+ private final class SocketTask implements TaskManager.Task {
+
+ /** The socket */
+ public final Socket socket;
+
+ /** Simple constructor */
+ public SocketTask(Socket socket) {
+ this.socket = socket;
+ }
+
+ /** Process a unicast discovery request */
+ public void run() {
+ respond(socket);
+ }
+
+ /** No ordering */
+ public boolean runAfter(List tasks, int size) {
+ return false;
+ }
+ }
+
+ /** Service lease expiration thread code */
+ private class ServiceExpireThread extends Thread {
+ /** True if thread has been interrupted */
+ private boolean interrupted = false;
+
+ /** Create a daemon thread */
+ public ServiceExpireThread() {
+ super("service expire");
+ setDaemon(true);
+ }
+
+ public synchronized void interrupt() {
+ interrupted = true;
+ super.interrupt();
+ }
+
+ public synchronized boolean isInterrupted() {
+ return interrupted;
+ }
+
+ public void run() {
+ try {
+ concurrentObj.writeLock();
+ } catch (ConcurrentLockException e) {
+ return;
+ }
+ try {
+ while (!isInterrupted()) {
+ long now = System.currentTimeMillis();
+ while (true) {
+ SvcReg reg = (SvcReg)serviceByTime.firstKey();
+ minSvcExpiration = reg.leaseExpiration;
+ if (minSvcExpiration > now)
+ break;
+ deleteService(reg, now);
+ addLogRecord(new ServiceLeaseCancelledLogObj(
+ reg.item.serviceID, reg.leaseID));
+ }
+ queueEvents();
+ try {
+ concurrentObj.writerWait(serviceNotifier,
+ minSvcExpiration - now);
+ } catch (ConcurrentLockException e) {
+ return;
+ }
+ }
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+ }
+
+ /** Event lease expiration thread code */
+ private class EventExpireThread extends Thread {
+
+ /** Create a daemon thread */
+ public EventExpireThread() {
+ super("event expire");
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ concurrentObj.writeLock();
+ } catch (ConcurrentLockException e) {
+ return;
+ }
+ try {
+ while (!isInterrupted()) {
+ long now = System.currentTimeMillis();
+ minEventExpiration = Long.MAX_VALUE;
+ while (!eventByTime.isEmpty()) {
+ EventReg reg = (EventReg)eventByTime.firstKey();
+ if (reg.leaseExpiration > now) {
+ minEventExpiration = reg.leaseExpiration;
+ break;
+ }
+ deleteEvent(reg);
+ }
+ try {
+ concurrentObj.writerWait(eventNotifier,
+ minEventExpiration - now);
+ } catch (ConcurrentLockException e) {
+ return;
+ }
+ }
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+ }
+
+ /**
+ * Termination thread code. We do this in a separate thread to
+ * avoid deadlock, because Activatable.inactive will block until
+ * in-progress RMI calls are finished.
+ */
+ private class DestroyThread extends Thread {
+ /** Maximum delay for unexport attempts */
+ private static final long MAX_UNEXPORT_DELAY = 2*TimeConstants.MINUTES;
+
+ /** Create a non-daemon thread */
+ public DestroyThread() {
+ super("destroy");
+ /* override inheritance from RMI daemon thread */
+ setDaemon(false);
+ }
+
+ public void run() {
+
+ /* must unregister before unexport */
+ if (activationID != null) {
+ try {
+ Activatable.unregister(activationID);
+ } catch (RemoteException e) {
+ /* give up until we can at least unregister */
+ return;
+ } catch (ActivationException e) {
+ }
+ }
+ final long endTime = System.currentTimeMillis()+MAX_UNEXPORT_DELAY;
+ boolean unexported = false;
+ try {
+ /* Unexport only if there are no pending or in progress calls*/
+ while(!unexported && (System.currentTimeMillis() < endTime)) {
+ unexported = UnicastRemoteObject.unexportObject
+ (RegistrarImpl.this,false);
+ if(!unexported) Thread.yield();
+ }//end loop
+ } catch (NoSuchObjectException e) {
+ unexported = true;
+ }
+ if(!unexported) {//Not yet unexported. Forcibly unexport
+ try {
+ UnicastRemoteObject.unexportObject
+ (RegistrarImpl.this,true);
+ } catch (NoSuchObjectException e) { }
+ }//endif
+ /* all daemons must terminate before deleting persistent store */
+ serviceExpirer.interrupt();
+ eventExpirer.interrupt();
+ unicaster.interrupt();
+ multicaster.interrupt();
+ announcer.interrupt();
+ snapshotter.interrupt();
+ tasker.terminate();
+ joiner.terminate();
+ discoer.terminate();
+ try {
+ serviceExpirer.join();
+ eventExpirer.join();
+ unicaster.join();
+ multicaster.join();
+ announcer.join();
+ snapshotter.join();
+ } catch (InterruptedException e) {
+ }
+ closeRequestSockets(tasker.getPending());
+ log.deletePersistentStore();
+ if (activationID != null) {
+ /* inactive will set current group ID to null */
+ ActivationGroupID gid = ActivationGroup.currentGroupID();
+ try {
+ Activatable.inactive(activationID);
+ } catch (RemoteException e) {
+ } catch (ActivationException e) {
+ }
+ /* If group is not 'shared', must destroy the group (VM) */
+ if (sharedActivationRef == null) {//not in a 'shared' group
+ try {
+ ActivationGroup.getSystem().unregisterGroup(gid);
+ } catch (RemoteException e) {
+ } catch (ActivationException e) {
+ }
+ }
+ }
+ }
+ }
+
+ /** Multicast discovery request thread code. */
+ private class MulticastThread extends Thread {
+ /** Multicast socket to receive packets */
+ private final MulticastSocket socket;
+
+ /**
+ * Create a high priority daemon thread. Set up the socket now
+ * rather than in run, so that we get any exception up front.
+ */
+ public MulticastThread() throws IOException {
+ super("multicast request");
+ setDaemon(true);
+ setPriority(Thread.MAX_PRIORITY);
+ socket = new MulticastSocket(Constants.discoveryPort);
+ if((nicAddresses != null) && (nicAddresses.length > 0)) {
+ for(int i=0;i<nicAddresses.length;i++) {
+ socket.setInterface(nicAddresses[i]);
+ socket.joinGroup(Constants.getRequestAddress());
+ }//end loop
+ } else {//use default interface
+ socket.joinGroup(Constants.getRequestAddress());
+ }//endif
+ }
+
+ /** True if thread has been interrupted */
+ private boolean interrupted = false;
+
+ /* This is a workaround for Thread.interrupt not working on
+ * MulticastSocket.receive on all platforms.
+ */
+ public synchronized void interrupt() {
+ interrupted = true;
+ socket.close();
+ }
+
+ public synchronized boolean isInterrupted() {
+ return interrupted;
+ }
+
+ public void run() {
+ byte[] buf = new byte[576];
+ DatagramPacket dgram = new DatagramPacket(buf, buf.length);
+ while (!isInterrupted()) {
+ try {
+ dgram.setLength(buf.length);
+ try {
+ socket.receive(dgram);
+ } catch (NullPointerException e) {
+ break; // workaround for bug 4190513
+ }
+ IncomingMulticastRequest req =
+ new IncomingMulticastRequest(dgram);
+ if ((req.getGroups().length != 0 &&
+ !overlap(memberGroups, req.getGroups())) ||
+ indexOf(req.getServiceIDs(), myServiceID) >= 0)
+ continue;
+ tasker.addIfNew(new AddressTask(req.getAddress(),
+ req.getPort()));
+ } catch (InterruptedIOException e) {
+ break;
+ } catch (IOException e) {
+ } catch (SecurityException e) {
+ }
+ /* if we fail in any way, just forget about it */
+ }
+ socket.close();
+ }
+ }
+
+ /** Unicast discovery request thread code. */
+ private class UnicastThread extends Thread {
+ /** Server socket to accepts connections on. */
+ private ServerSocket listen;
+ /** Listen port */
+ public int port;
+
+ /**
+ * Create a daemon thread. Set up the socket now rather than in run,
+ * so that we get any exception up front.
+ */
+ public UnicastThread(int port) throws IOException {
+ super("unicast request");
+ setDaemon(true);
+ if (port == 0) {
+ try {
+ listen = new ServerSocket(Constants.discoveryPort);
+ this.port = Constants.discoveryPort;
+ return;
+ } catch (IOException e) {
+ }
+ }
+ listen = new ServerSocket(port);
+ this.port = listen.getLocalPort();
+ }
+
+ /** True if thread has been interrupted */
+ private boolean interrupted = false;
+
+ /* This is a workaround for Thread.interrupt not working on
+ * ServerSocket.accept on all platforms. ServerSocket.close
+ * can't be used as a workaround, because it also doesn't work
+ * on all platforms.
+ */
+ public synchronized void interrupt() {
+ interrupted = true;
+ try {
+ (new Socket(InetAddress.getLocalHost(), port)).close();
+ } catch (IOException e) {
+ }
+ }
+
+ public synchronized boolean isInterrupted() {
+ return interrupted;
+ }
+
+ public void run() {
+ while (!isInterrupted()) {
+ try {
+ Socket socket = listen.accept();
+ if (isInterrupted()) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ }
+ break;
+ }
+ tasker.add(new SocketTask(socket));
+ } catch (InterruptedIOException e) {
+ break;
+ } catch (IOException e) {
+ } catch (SecurityException e) {
+ }
+ /* if we fail in any way, just forget about it */
+ }
+ try {
+ listen.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ /** Multicast discovery announcement thread code. */
+ private class AnnounceThread extends Thread {
+ /** Multicast socket to send packets on */
+ private final MulticastSocket socket;
+
+ /**
+ * Create a daemon thread. Set up the socket now rather than in run,
+ * so that we get any exception up front.
+ */
+ public AnnounceThread() throws IOException {
+ super("discovery announcement");
+ setDaemon(true);
+ socket = new MulticastSocket();
+ socket.setTimeToLive(Integer.getInteger("net.jini.discovery.ttl",
+ 15).intValue());
+ }
+
+ public synchronized void run() {
+ int interval = Integer.getInteger("net.jini.discovery.announce",
+ 1000 * 60 * 2).intValue();
+ try {
+ while (!isInterrupted() && announce(memberGroups)) {
+ wait(interval);
+ }
+ } catch (InterruptedException e) {
+ }
+ if (memberGroups.length > 0)
+ announce(new String[0]);//send NO_GROUPS just before shutdown
+ socket.close();
+ }
+
+ /**
+ * Announce membership in the specified groups, and return false if
+ * interrupted, otherwise return true.
+ */
+ private boolean announce(String[] groups) {
+ try {
+ DatagramPacket[] dgrams =
+ OutgoingMulticastAnnouncement.marshal(myServiceID,
+ myLocator,
+ groups);
+ try {
+ sendPacketByNIC(socket,dgrams,nicAddresses);
+ } catch (InterruptedIOException e) {
+ return false;
+ }
+ } catch (InterruptedIOException e) {
+ return false;
+ } catch (IOException e) {
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Snapshot-taking thread.
+ * <p>
+ * A snapshot is taken when -- after writing a new record to the
+ * log file -- it is determined that the size of the log file has
+ * exceeded a certain threshold. The code which adds the new record
+ * to the log file and which, in turn, decides that a snapshot
+ * must be taken is "wrapped" in a writer mutex. That is, synchronization
+ * of processing is achieved in the Registrar through a "reader/writer"
+ * mutex construct. This construct allows only one writer at any one
+ * time; but allows an unlimited number of simultaneous readers as
+ * long as no writer has locked the mutex. During steady-state, it is
+ * anticipated that there will be far more readers (e.g. lookups) in use
+ * than writers (e.g. add/mod/del Attributes). Since the process of
+ * taking a snapshot can be time-consuming, if the whole snapshot-taking
+ * process occupies that single writer mutex, then a significant number
+ * of read requests will be un-necessarily blocked; possibly resulting
+ * in an unacceptable degradation in response time.
+ * <p>
+ * It is for the above reason that the process of taking a snapshot is
+ * performed in a separate thread. The thread waits on the monitor
+ * belonging to the snapshotNotifier instance until it is notified
+ * (or "signalled") that a snapshot must be taken. The notification
+ * is sent by another thread, created by the Registrar, which
+ * determines when the conditions are right for a snapshot. The
+ * notification takes the form of an interrupt indicating that the
+ * Snapshot monitor is available. Although the interrupt is sent
+ * while the writer mutex is locked, the act of sending the notification
+ * is less time-consuming than the act of taking the snapshot itself.
+ * When the thread receives a notification, it awakens and requests a
+ * lock on the reader mutex (this is all done in the readerWait() method).
+ * Because a reader -- not a writer -- mutex is locked, read-only
+ * processes still have access to the system state, so lookups can be
+ * performed; and the reader mutex prevents changes to the data while
+ * the snapshot is in progress.
+ * <p>
+ * Note that the current snapshot is guaranteed to complete before the
+ * next snapshot request is received. This is because even though
+ * the act of taking a snapshot can be viewed as a writer process,
+ * the fact that the next snapshot notification will be wrapped in a
+ * writer mutex, combined with the fact that a writer mutex can not
+ * be locked while a reader mutex is locked, allows the snapshot to
+ * be treated as a reader process.
+ */
+ private class SnapshotThread extends Thread {
+ /** True if thread has been interrupted */
+ private boolean interrupted = false;
+
+ /** Create a daemon thread */
+ public SnapshotThread() {
+ super("snapshot thread");
+ setDaemon(true);
+ }
+
+ public synchronized void interrupt() {
+ interrupted = true;
+ super.interrupt();
+ }
+
+ public synchronized boolean isInterrupted() {
+ return interrupted;
+ }
+
+ public void run() {
+ try {
+ concurrentObj.readLock();
+ } catch (ConcurrentLockException e) {
+ return;
+ }
+ try {
+ while (!isInterrupted()) {
+ try {
+ concurrentObj.readerWait(snapshotNotifier,
+ Long.MAX_VALUE);
+ try {
+ log.snapshot();
+ logFileSize = 0;
+ } catch (Exception e) {
+ if (isInterrupted())
+ return;
+ /* XXX
+ * if taking the snapshot fails for any reason,
+ * then one of the following must be done:
+ * -- output the problem to a file and exit
+ * -- output the problem to a file and continue
+ * -- set an "I have a problem" attribute and
+ * then send a notification
+ * this issue will be addressed at a later time
+ */
+ e.printStackTrace();
+ }
+ } catch (ConcurrentLockException e) {
+ return;
+ }
+ }
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public ServiceRegistration register(Item nitem, long leaseDuration)
+ {
+ concurrentObj.writeLock();
+ try {
+ return registerDo(nitem, leaseDuration);
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public MarshalledObject lookup(Template tmpl)
+ {
+ concurrentObj.readLock();
+ try {
+ return lookupDo(tmpl);
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public Matches lookup(Template tmpl, int maxMatches)
+ {
+ concurrentObj.readLock();
+ try {
+ return lookupDo(tmpl, maxMatches);
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public EventRegistration notify(Template tmpl,
+ int transitions,
+ RemoteEventListener listener,
+ MarshalledObject handback,
+ long leaseDuration)
+ {
+ concurrentObj.writeLock();
+ try {
+ return notifyDo(tmpl, transitions, listener, handback,
+ leaseDuration);
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public EntryClassBase[] getEntryClasses(Template tmpl)
+ {
+ concurrentObj.readLock();
+ try {
+ return getEntryClassesDo(tmpl);
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public Object[] getFieldValues(Template tmpl, int setIndex, int field)
+ {
+ concurrentObj.readLock();
+ try {
+ return getFieldValuesDo(tmpl, setIndex, field);
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public ServiceTypeBase[] getServiceTypes(Template tmpl, String prefix)
+ {
+ concurrentObj.readLock();
+ try {
+ return getServiceTypesDo(tmpl, prefix);
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public ServiceID getServiceID() {
+ return myServiceID;
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public LookupLocator getLocator() {
+ return myLocator;
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public Object getAdmin() throws RemoteException {
+ return (new RegistrarAdminProxy(this, myServiceID));
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void addAttributes(ServiceID serviceID,
+ long leaseID,
+ EntryRep[] attrSets)
+ throws UnknownLeaseException
+ {
+ if (serviceID.equals(myServiceID))
+ throw new SecurityException("privileged service id");
+ concurrentObj.writeLock();
+ try {
+ addAttributesDo(serviceID, leaseID, attrSets);
+ addLogRecord(new AttrsAddedLogObj(serviceID, leaseID, attrSets));
+ queueEvents();
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void modifyAttributes(ServiceID serviceID,
+ long leaseID,
+ EntryRep[] attrSetTmpls,
+ EntryRep[] attrSets)
+ throws UnknownLeaseException
+ {
+ if (serviceID.equals(myServiceID))
+ throw new SecurityException("privileged service id");
+ concurrentObj.writeLock();
+ try {
+ modifyAttributesDo(serviceID, leaseID, attrSetTmpls, attrSets);
+ addLogRecord(new AttrsModifiedLogObj(serviceID, leaseID,
+ attrSetTmpls, attrSets));
+ queueEvents();
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void setAttributes(ServiceID serviceID,
+ long leaseID,
+ EntryRep[] attrSets)
+ throws UnknownLeaseException
+ {
+ if (serviceID.equals(myServiceID))
+ throw new SecurityException("privileged service id");
+ concurrentObj.writeLock();
+ try {
+ setAttributesDo(serviceID, leaseID, attrSets);
+ addLogRecord(new AttrsSetLogObj(serviceID, leaseID, attrSets));
+ queueEvents();
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void cancelServiceLease(ServiceID serviceID, long leaseID)
+ throws UnknownLeaseException
+ {
+ concurrentObj.writeLock();
+ try {
+ cancelServiceLeaseDo(serviceID, leaseID);
+ addLogRecord(new ServiceLeaseCancelledLogObj(serviceID, leaseID));
+ queueEvents();
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public long renewServiceLease(ServiceID serviceID,
+ long leaseID,
+ long renewDuration)
+ throws UnknownLeaseException
+ {
+ concurrentObj.priorityWriteLock();
+ try {
+ return renewServiceLeaseDo(serviceID, leaseID, renewDuration);
+ /* addLogRecord is in renewServiceLeaseDo */
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void cancelEventLease(long eventID, long leaseID)
+ throws UnknownLeaseException
+ {
+ concurrentObj.writeLock();
+ try {
+ cancelEventLeaseDo(eventID, leaseID);
+ addLogRecord(new EventLeaseCancelledLogObj(eventID, leaseID));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public long renewEventLease(long eventID, long leaseID, long renewDuration)
+ throws UnknownLeaseException
+ {
+ concurrentObj.priorityWriteLock();
+ try {
+ return renewEventLeaseDo(eventID, leaseID, renewDuration);
+ /* addLogRecord is in renewEventLeaseDo */
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public RenewResults renewLeases(Object[] regIDs,
+ long[] leaseIDs,
+ long[] renewDurations)
+ {
+ concurrentObj.priorityWriteLock();
+ try {
+ return renewLeasesDo(regIDs, leaseIDs, renewDurations);
+ /* addLogRecord is in renewLeasesDo */
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public Exception[] cancelLeases(Object[] regIDs, long[] leaseIDs)
+ {
+ concurrentObj.writeLock();
+ try {
+ Exception[] exceptions = cancelLeasesDo(regIDs, leaseIDs);
+ addLogRecord(new LeasesCancelledLogObj(regIDs, leaseIDs));
+ queueEvents();
+ return exceptions;
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public Entry[] getLookupAttributes() throws RemoteException {
+ concurrentObj.readLock();
+ try {
+ /* no need to clone, never modified once created */
+ return lookupAttrs;
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void addLookupAttributes(Entry[] attrSets) throws RemoteException {
+ concurrentObj.writeLock();
+ try {
+ joiner.addAttributes(attrSets, true);
+ lookupAttrs = joiner.getAttributes();
+ EntryRep[] attrs = EntryRep.toEntryRep(attrSets, true);
+ try {
+ addAttributesDo(myServiceID, 0, attrs);
+ } catch (UnknownLeaseException e) {
+ }
+ addLogRecord(new AttrsAddedLogObj(myServiceID, 0, attrs));
+ queueEvents();
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void modifyLookupAttributes(Entry[] attrSetTemplates,
+ Entry[] attrSets)
+ throws RemoteException
+ {
+ concurrentObj.writeLock();
+ try {
+ joiner.modifyAttributes(attrSetTemplates, attrSets, true);
+ lookupAttrs = joiner.getAttributes();
+ EntryRep[] tmpls = EntryRep.toEntryRep(attrSetTemplates, false);
+ EntryRep[] attrs = EntryRep.toEntryRep(attrSets, false);
+ try {
+ modifyAttributesDo(myServiceID, 0, tmpls, attrs);
+ } catch (UnknownLeaseException e) {
+ }
+ addLogRecord(new AttrsModifiedLogObj(myServiceID, 0,
+ tmpls, attrs));
+ queueEvents();
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public String[] getLookupGroups() throws RemoteException {
+ concurrentObj.readLock();
+ try {
+ /* no need to clone, never modified once created */
+ return lookupGroups;
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void addLookupGroups(String[] groups) throws RemoteException {
+ concurrentObj.writeLock();
+ try {
+ try {
+ discoer.addGroups(groups);
+ } catch (IOException e) {
+ throw new RuntimeException(e.toString());
+ }
+ lookupGroups = discoer.getGroups();
+ addLogRecord(new LookupGroupsChangedLogObj(lookupGroups));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void removeLookupGroups(String[] groups) throws RemoteException {
+ concurrentObj.writeLock();
+ try {
+ discoer.removeGroups(groups);
+ lookupGroups = discoer.getGroups();
+ addLogRecord(new LookupGroupsChangedLogObj(lookupGroups));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void setLookupGroups(String[] groups) throws RemoteException {
+ concurrentObj.writeLock();
+ try {
+ try {
+ discoer.setGroups(groups);
+ } catch (IOException e) {
+ throw new RuntimeException(e.toString());
+ }
+ lookupGroups = discoer.getGroups();
+ addLogRecord(new LookupGroupsChangedLogObj(lookupGroups));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public LookupLocator[] getLookupLocators() throws RemoteException {
+ concurrentObj.readLock();
+ try {
+ /* no need to clone, never modified once created */
+ return lookupLocators;
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void addLookupLocators(LookupLocator[] locators)
+ throws RemoteException
+ {
+ concurrentObj.writeLock();
+ try {
+ discoer.addLocators(locators);
+ lookupLocators = discoer.getLocators();
+ addLogRecord(new LookupLocatorsChangedLogObj(lookupLocators));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void removeLookupLocators(LookupLocator[] locators)
+ throws RemoteException
+ {
+ concurrentObj.writeLock();
+ try {
+ discoer.removeLocators(locators);
+ lookupLocators = discoer.getLocators();
+ addLogRecord(new LookupLocatorsChangedLogObj(lookupLocators));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void setLookupLocators(LookupLocator[] locators)
+ throws RemoteException
+ {
+ concurrentObj.writeLock();
+ try {
+ discoer.setLocators(locators);
+ lookupLocators = discoer.getLocators();
+ addLogRecord(new LookupLocatorsChangedLogObj(lookupLocators));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void addMemberGroups(String[] groups) throws RemoteException {
+ concurrentObj.writeLock();
+ try {
+ for (int i = 0; i < groups.length; i++) {
+ if (indexOf(memberGroups, groups[i]) < 0)
+ memberGroups = (String[])arrayAdd(memberGroups, groups[i]);
+ }
+ synchronized (announcer) {
+ announcer.notify();
+ }
+ addLogRecord(new MemberGroupsChangedLogObj(memberGroups));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void removeMemberGroups(String[] groups) throws RemoteException {
+ concurrentObj.writeLock();
+ try {
+ for (int i = 0; i < groups.length; i++) {
+ int j = indexOf(memberGroups, groups[i]);
+ if (j >= 0)
+ memberGroups = (String[])arrayDel(memberGroups, j);
+ }
+ synchronized (announcer) {
+ announcer.notify();
+ }
+ addLogRecord(new MemberGroupsChangedLogObj(memberGroups));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public String[] getMemberGroups() {
+ concurrentObj.readLock();
+ try {
+ /* no need to clone, never modified once created */
+ return memberGroups;
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void setMemberGroups(String[] groups) throws RemoteException {
+ concurrentObj.writeLock();
+ try {
+ memberGroups = (String[])removeDups(groups);
+ addLogRecord(new MemberGroupsChangedLogObj(memberGroups));
+ synchronized (announcer) {
+ announcer.notify();
+ }
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public int getUnicastPort() throws RemoteException {
+ concurrentObj.readLock();
+ try {
+ return unicastPort;
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void setUnicastPort(int port) throws IOException,RemoteException {
+ if (port == unicastPort)
+ return;
+ concurrentObj.writeLock();
+ try {
+ if ((port == 0 && unicaster.port == Constants.discoveryPort) ||
+ port == unicaster.port)
+ {
+ unicastPort = port;
+ addLogRecord(new UnicastPortSetLogObj(port));
+ return;
+ }
+ /* create a UnicastThread that listens on the new port */
+ UnicastThread newUnicaster = new UnicastThread(port);
+ /* terminate the current UnicastThread listening on the old port */
+ unicaster.interrupt();
+ try {
+ unicaster.join();
+ } catch (InterruptedException e) { }
+ /* start the UnicastThread listening on the new port */
+ unicaster = newUnicaster;
+ unicaster.start();
+ unicastPort = port;
+ myLocator = new LookupLocator(myLocator.getHost(), unicaster.port);
+ synchronized (announcer) {
+ announcer.notify();
+ }
+ addLogRecord(new UnicastPortSetLogObj(port));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void setMinMaxServiceLease(long leaseDuration)
+ throws RemoteException
+ {
+ concurrentObj.writeLock();
+ try {
+ if (leaseDuration > MAX_LEASE)
+ throw new IllegalArgumentException("max duration exceeded");
+ minMaxServiceLease = leaseDuration;
+ computeMaxLeases();
+ addLogRecord(new MinMaxServiceLeaseSetLogObj(leaseDuration));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public long getMinMaxServiceLease() throws RemoteException {
+ concurrentObj.readLock();
+ try {
+ return minMaxServiceLease;
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void setMinMaxEventLease(long leaseDuration)
+ throws RemoteException
+ {
+ concurrentObj.writeLock();
+ try {
+ if (leaseDuration > MAX_LEASE)
+ throw new IllegalArgumentException("max duration exceeded");
+ minMaxEventLease = leaseDuration;
+ computeMaxLeases();
+ addLogRecord(new MinMaxEventLeaseSetLogObj(leaseDuration));
+ } finally {
+ concurrentObj.writeUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public long getMinMaxEventLease() throws RemoteException {
+ concurrentObj.readLock();
+ try {
+ return minMaxEventLease;
+ } finally {
+ concurrentObj.readUnlock();
+ }
+ }
+
+ // This method's javadoc is inherited from an interface of this class
+ public void setMinRenewalInterval(long interval) throws RemoteException {
+ concurrentObj.writeLock();
+ try {
+ if (interval > MAX_RENEW)
+ throw new IllegalArgumentException("max interval exceeded");
[... 2017 lines stripped ...]