You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2014/05/09 09:03:19 UTC
svn commit: r1593493 [15/24] - in /river/jtsk/skunk/qa_refactor/trunk: qa/
qa/src/com/sun/jini/test/impl/end2end/jssewrapper/
qa/src/com/sun/jini/test/impl/joinmanager/
qa/src/com/sun/jini/test/impl/mahalo/
qa/src/com/sun/jini/test/impl/outrigger/match...
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java?rev=1593493&r1=1593492&r2=1593493&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java Fri May 9 07:03:18 2014
@@ -1,6086 +1,6092 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.sun.jini.reggie;
-
-import com.sun.jini.config.Config;
-import com.sun.jini.constants.ThrowableConstants;
-import com.sun.jini.constants.VersionConstants;
-import com.sun.jini.discovery.ClientSubjectChecker;
-import com.sun.jini.discovery.Discovery;
-import com.sun.jini.discovery.DiscoveryConstraints;
-import com.sun.jini.discovery.DiscoveryProtocolException;
-import com.sun.jini.discovery.EncodeIterator;
-import com.sun.jini.discovery.MulticastAnnouncement;
-import com.sun.jini.discovery.MulticastRequest;
-import com.sun.jini.discovery.UnicastResponse;
-import com.sun.jini.logging.Levels;
-import com.sun.jini.lookup.entry.BasicServiceType;
-import com.sun.jini.proxy.MarshalledWrapper;
-import com.sun.jini.reliableLog.LogHandler;
-import com.sun.jini.reliableLog.ReliableLog;
-import com.sun.jini.start.LifeCycle;
-import com.sun.jini.thread.InterruptedStatusThread;
-import com.sun.jini.thread.InterruptedStatusThread.Interruptable;
-import com.sun.jini.thread.ReadersWriter;
-import com.sun.jini.thread.ReadersWriter.ConcurrentLockException;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.rmi.MarshalledObject;
-import java.rmi.NoSuchObjectException;
-import java.rmi.RemoteException;
-import java.rmi.activation.ActivationException;
-import java.rmi.activation.ActivationID;
-import java.rmi.activation.ActivationSystem;
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
-import javax.security.auth.Subject;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-import net.jini.activation.ActivationExporter;
-import net.jini.activation.ActivationGroup;
-import net.jini.config.Configuration;
-import net.jini.config.ConfigurationException;
-import net.jini.config.ConfigurationProvider;
-import net.jini.config.NoSuchEntryException;
-import net.jini.constraint.BasicMethodConstraints;
-import net.jini.core.constraint.InvocationConstraints;
-import net.jini.core.constraint.MethodConstraints;
-import net.jini.core.constraint.RemoteMethodControl;
-import net.jini.core.discovery.LookupLocator;
-import net.jini.core.entry.Entry;
-import net.jini.core.event.EventRegistration;
-import net.jini.core.event.RemoteEventListener;
-import net.jini.core.lease.Lease;
-import net.jini.core.lease.UnknownLeaseException;
-import net.jini.core.lookup.ServiceID;
-import net.jini.core.lookup.ServiceItem;
-import net.jini.core.lookup.ServiceRegistrar;
-import net.jini.core.lookup.ServiceRegistration;
-import net.jini.discovery.Constants;
-import net.jini.discovery.ConstrainableLookupLocator;
-import net.jini.discovery.DiscoveryGroupManagement;
-import net.jini.discovery.DiscoveryLocatorManagement;
-import net.jini.discovery.DiscoveryManagement;
-import net.jini.discovery.LookupDiscoveryManager;
-import net.jini.export.Exporter;
-import net.jini.export.ProxyAccessor;
-import net.jini.id.ReferentUuid;
-import net.jini.id.Uuid;
-import net.jini.id.UuidFactory;
-import net.jini.io.MarshalledInstance;
-import net.jini.io.UnsupportedConstraintException;
-import net.jini.jeri.BasicILFactory;
-import net.jini.jeri.BasicJeriExporter;
-import net.jini.jeri.tcp.TcpServerEndpoint;
-import net.jini.lookup.JoinManager;
-import net.jini.lookup.entry.ServiceInfo;
-import net.jini.security.BasicProxyPreparer;
-import net.jini.security.ProxyPreparer;
-import net.jini.security.TrustVerifier;
-import net.jini.security.proxytrust.ServerProxyTrust;
-import org.apache.river.api.util.Startable;
-import org.apache.river.impl.thread.NamedThreadFactory;
-import org.apache.river.impl.thread.SynchronousExecutors;
-
-/**
- * Base server-side implementation of a lookup service, subclassed by
- * TransientRegistrarImpl and PersistentRegistrarImpl. Multiple client-side
- * proxy classes are used, for the ServiceRegistrar interface as well as for
- * leases and administration; their methods transform the parameters and then
- * make corresponding calls on the Registrar interface implemented on the
- * server side.
- *
- * @author Sun Microsystems, Inc.
- *
- */
-class RegistrarImpl implements Registrar, ProxyAccessor, ServerProxyTrust, Startable {
-
- /** Maximum minMax lease duration for both services and events */
- private static final long MAX_LEASE = 1000L * 60 * 60 * 24 * 365 * 1000;
- /** Maximum minimum renewal interval */
- private static final long MAX_RENEW = 1000L * 60 * 60 * 24 * 365;
- /** Default maximum size of multicast packets to send and receive */
- private static final int DEFAULT_MAX_PACKET_SIZE = 512;
- /** Default time to live value to use for sending multicast packets */
- private static final int DEFAULT_MULTICAST_TTL = 15;
- /** Default timeout to set on sockets used for unicast discovery */
- private static final int DEFAULT_SOCKET_TIMEOUT = 1*60*1000;
- /** Log format version */
- private static final int LOG_VERSION = 3;
- /** Logger and configuration component name */
- private static final String COMPONENT = "com.sun.jini.reggie";
- /** Lease ID always assigned to self */
- private static final Uuid myLeaseID = UuidFactory.create(0L, 0L);
- /** Logger used by this service */
- private static final Logger logger = Logger.getLogger(COMPONENT);
-
- /** Base set of initial attributes for self */
- private static final Entry[] baseAttrs = {
- new ServiceInfo(
- "Lookup", "Sun Microsystems, Inc.", "Sun Microsystems, Inc.",
- VersionConstants.SERVER_VERSION, "", ""),
- new BasicServiceType("Lookup")
- };
- /** Empty attribute set */
- private static final EntryRep[] emptyAttrs = {};
-
- /** Proxy for myself */
- private RegistrarProxy proxy;
- /** Exporter for myself */
- private volatile Exporter serverExporter; // accessed without lock from DestroyThread
- /** Remote reference for myself */
- private Registrar myRef;
- /** Our service ID */
- private volatile ServiceID myServiceID; // accessed without lock from DecodeRequestTask
- /** Our activation id, or null if not activatable */
- private final ActivationID activationID;
- /** Associated activation system, or null if not activatable */
- private final ActivationSystem activationSystem;
- /** Our LookupLocator */
- private volatile LookupLocator myLocator; // accessed without lock from Announce
- /** Our login context, for logging out */
- private final LoginContext loginContext;
- /** Shutdown callback object, or null if no callback needed */
- private final LifeCycle lifeCycle;
-
- /** Unicast socket factories */
- private final ServerSocketFactory serverSocketFactory ;
- private final SocketFactory socketFactory;
-
- /**
- * Map from ServiceID to SvcReg. Every service is in this map under
- * its serviceID.
- */
- private final Map<ServiceID,SvcReg> serviceByID = new HashMap<ServiceID,SvcReg>();
- /**
- * Identity map from SvcReg to SvcReg, ordered by lease expiration.
- * Every service is in this map.
- */
- private final SortedSet<SvcReg> serviceByTime = new TreeSet<SvcReg>();
- /**
- * Map from String to HashMap mapping ServiceID to SvcReg. Every service
- * is in this map under its types.
- */
- private final Map<String,Map<ServiceID,SvcReg>> serviceByTypeName
- = new HashMap<String,Map<ServiceID,SvcReg>>();
- /**
- * Map from EntryClass to HashMap[] where each HashMap is a map from
- * Object (field value) to ArrayList(SvcReg). The HashMap array has as
- * many elements as the EntryClass has fields (including fields defined
- * by superclasses). Services are in this map multiple times, once
- * for each field of each entry it has. The outer map is indexed by the
- * first (highest) superclass that defines the field. This means that a
- * HashMap[] has null elements for fields defined by superclasses, but
- * this is a small memory hit and is simpler than subtracting off base
- * index values when accessing the arrays.
- */
- private final Map<EntryClass,Map<Object,List<SvcReg>>[]> serviceByAttr
- = new HashMap<EntryClass,Map<Object,List<SvcReg>>[]>(23);
- /**
- * Map from EntryClass to ArrayList(SvcReg). Services are in this map
- * multiple times, once for each no-fields entry it has (no fields meaning
- * none of the superclasses have fields either). The map is indexed by
- * the exact type of the entry.
- */
- private final Map<EntryClass,List<SvcReg>> serviceByEmptyAttr
- = new HashMap<EntryClass,List<SvcReg>>(11);
- /** All EntryClasses with non-zero numInstances */
- private final List<EntryClass> entryClasses = new ArrayList<EntryClass>();
- /**
- * Map from Long(eventID) to EventReg. Every event registration is in
- * this map under its eventID.
- */
- private final Map<Long,EventReg> eventByID = new HashMap<Long,EventReg>(11);
- /**
- * Identity map from EventReg to EventReg, ordered by lease expiration.
- * Every event registration is in this map.
- */
- private final SortedMap<EventReg,EventReg> eventByTime = new TreeMap<EventReg,EventReg>();
- /**
- * Map from ServiceID to EventReg or EventReg[]. An event
- * registration is in this map if its template matches on (at least)
- * a specific serviceID.
- */
- private final Map<ServiceID,Object> subEventByService = new HashMap<ServiceID,Object>(11);
- /**
- * Map from Long(eventID) to EventReg. An event registration is in
- * this map if its template matches on ANY_SERVICE_ID.
- */
- private final Map<Long,EventReg> subEventByID = new HashMap<Long,EventReg>(11);
-
- /** Generator for resource (e.g., registration, lease) Uuids */
- private final UuidGenerator resourceIdGenerator;
- /** Generator for service IDs */
- private final UuidGenerator serviceIdGenerator;
- /** Event ID */
- private long eventID = 0; // protected by concurrentObj
- /** Random number generator for use in lookup */
- private final Random random = new Random();
-
- /** Preparer for received remote event listeners */
- private final ProxyPreparer listenerPreparer;
- /** Preparer for remote event listeners recovered from state log */
- private final ProxyPreparer recoveredListenerPreparer;
- /** Preparer for received lookup locators */
- private final ProxyPreparer locatorPreparer;
- /** Preparer for lookup locators recovered from state log */
- private final ProxyPreparer recoveredLocatorPreparer;
-
- /** Current maximum service lease duration granted, in milliseconds. */
- private long maxServiceLease;
- /** Current maximum event lease duration granted, in milliseconds. */
- private long maxEventLease;
- /** Earliest expiration time of a SvcReg */
- private long minSvcExpiration = Long.MAX_VALUE;
- /** Earliest expiration time of an EventReg */
- private long minEventExpiration = Long.MAX_VALUE;
-
- /** Manager for discovering other lookup services */
- private final DiscoveryManagement discoer;
- /** Manager for joining other lookup services */
- private volatile JoinManager joiner; // accessed without lock from DestroyThread
- /** Executors for sending events and discovery responses */
- private final SynchronousExecutors eventNotifierExec;
- private final Map<EventReg,ExecutorService> eventTaskMap;
-// private final EventTaskQueue eventTaskQueue;
- private final ExecutorService discoveryResponseExec;
- /** Service lease expiration thread */
- private final Thread serviceExpirer;
- /** Event lease expiration thread */
- private final Thread eventExpirer;
- /** Unicast discovery request packet receiving thread */
- private volatile Thread unicaster; // accessed without lock from DestroyThread
- private volatile Unicast unicast;
- /** Multicast discovery request packet receiving thread */
- private final Thread multicaster;
- /** Multicast discovery announcement sending thread */
- private final Thread announcer;
- /** Snapshot-taking thread */
- private final Thread snapshotter;
-
- /** Concurrent object to control read and write access */
- private final ReadersWriter concurrentObj = new ReadersWriter();
- /** Object for synchronizing with the service expire thread */
- private final Condition serviceNotifier;
- /** Object for synchronizing with the event expire thread */
- private final Condition eventNotifier;
- /** Object on which the snapshot-taking thread will synchronize */
- private final Condition snapshotNotifier;
-
- /** Canonical ServiceType for java.lang.Object */
- private final ServiceType objectServiceType;
-
- /** Log for recovering/storing state, or null if running as transient */
- private final ReliableLog log;
- /** Flag indicating whether system is in a state of recovery */
- private volatile boolean inRecovery;
- /** Current number of records in the Log File since the last snapshot */
- private final AtomicInteger logFileSize = new AtomicInteger();
-
- /** Log file must contain this many records before snapshot allowed */
- private final int persistenceSnapshotThreshold ;
- /** Weight factor applied to snapshotSize when deciding to take snapshot */
- private final float persistenceSnapshotWeight;
- /** Minimum value for maxServiceLease. */
- private final long minMaxServiceLease;
- /** Minimum value for maxEventLease. */
- private final long minMaxEventLease;
- /** Minimum average time between lease renewals, in milliseconds. */
- private final long minRenewalInterval;
- /** Port for unicast discovery */
- private volatile int unicastPort;
- /** The groups we are a member of */
- private volatile String[] memberGroups; // accessed from DecodeRequestTask and Announce
- /** The groups we should join */
- private volatile String[] lookupGroups;
- /** The locators of other lookups we should join */
- private volatile LookupLocator[] lookupLocators;
- /** The attributes to use when joining (including with myself) */
- private volatile Entry[] lookupAttrs;
- /** Interval to wait in between sending multicast announcements */
- private final long multicastAnnouncementInterval;
- /** Multicast announcement sequence number */
- private final AtomicLong announcementSeqNo = new AtomicLong();
-
- /** Network interfaces to use for multicast discovery */
- private final NetworkInterface[] multicastInterfaces;
- /** Flag indicating whether network interfaces were explicitly specified */
- private final boolean multicastInterfacesSpecified;
- /** Interval to wait in between retrying failed interfaces */
- private final int multicastInterfaceRetryInterval;
- /** Utility for participating in version 2 of discovery protocols */
- private final Discovery protocol2;
- /** Cached raw constraints associated with unicastDiscovery method*/
- private final InvocationConstraints rawUnicastDiscoveryConstraints;
- /** Constraints specified for incoming multicast requests */
- private final DiscoveryConstraints multicastRequestConstraints;
- /** Constraints specified for outgoing multicast announcements */
- private final DiscoveryConstraints multicastAnnouncementConstraints;
- /** Constraints specified for handling unicast discovery */
- private final DiscoveryConstraints unicastDiscoveryConstraints;
- /** Client subject checker to apply to incoming multicast requests */
- private final ClientSubjectChecker multicastRequestSubjectChecker;
- /** Maximum time to wait for calls to finish before forcing unexport */
- private final long unexportTimeout;
- /** Time to wait between unexport attempts */
- private final long unexportWait;
- /** Client subject checker to apply to unicast discovery attempts */
- private final ClientSubjectChecker unicastDiscoverySubjectChecker;
-
- // Not required after start is called.
- private String unicastDiscoveryHost;
- private Configuration config;
- private Exception constructionException;
- private AccessControlContext context;
-
-
- /**
- * Constructs RegistrarImpl based on a configuration obtained using the
- * provided string arguments. If activationID is non-null, the created
- * RegistrarImpl runs as activatable; if persistent is true, it
- * persists/recovers its state to/from disk. A RegistrarImpl instance
- * cannot be constructed as both activatable and non-persistent. If
- * lifeCycle is non-null, its unregister method is invoked during shutdown.
- */
- RegistrarImpl(String[] configArgs,
- final ActivationID activationID,
- final boolean persistent,
- final LifeCycle lifeCycle)
- throws Exception
- {
- this(ConfigurationProvider.getInstance(
- configArgs, RegistrarImpl.class.getClassLoader())
- ,activationID,persistent,lifeCycle);
- }
-
- /**
- * Constructs RegistrarImpl based on the
- * Configuration argument. If activationID is non-null, the created
- * RegistrarImpl runs as activatable; if persistent is true, it
- * persists/recovers its state to/from disk. A RegistrarImpl instance
- * cannot be constructed as both activatable and non-persistent. If
- * lifeCycle is non-null, its unregister method is invoked during shutdown.
- */
- RegistrarImpl(final Configuration config,
- final ActivationID activationID,
- final boolean persistent,
- final LifeCycle lifeCycle)
- throws Exception
- {
- this(loginAndRun(config,activationID,persistent,lifeCycle));
- }
-
-
-
- private static Initializer loginAndRun( final Configuration config,
- final ActivationID activationID,
- final boolean persistent,
- final LifeCycle lifeCycle)
- throws Exception
- {
-
- Initializer result = null;
- try {
- if (activationID != null && !persistent) {
- throw new IllegalArgumentException();
- }
- final LoginContext loginContext = (LoginContext) config.getEntry(
- COMPONENT, "loginContext", LoginContext.class, null);
-
- PrivilegedExceptionAction<Initializer> init = new PrivilegedExceptionAction<Initializer>() {
- public Initializer run() throws Exception {
- return new Initializer(config,
- activationID, persistent, lifeCycle, loginContext);
- }
- };
- if (loginContext != null) {
- loginContext.login();
- try {
- result = Subject.doAsPrivileged(
- loginContext.getSubject(), init, null);
- } catch (PrivilegedActionException e) {
- throw e.getCause();
- }
- } else {
- result = init.run();
- }
- return result;
- } catch (Throwable t) {
- logger.log(Level.SEVERE, "Reggie initialization failed", t);
- if (t instanceof Exception) {
- throw (Exception) t;
- } else {
- throw (Error) t;
- }
- }
- }
-
- private RegistrarImpl(Initializer init){
- this.snapshotNotifier = concurrentObj.newCondition();
- this.eventNotifier = concurrentObj.newCondition();
- this.serviceNotifier = concurrentObj.newCondition();
- lifeCycle = init.lifeCycle;
- serverSocketFactory = init.serverSocketFactory;
- persistenceSnapshotThreshold = init.persistenceSnapshotThreshold;
- socketFactory = init.socketFactory;
- recoveredListenerPreparer = init.recoveredListenerPreparer;
- persistenceSnapshotWeight = init.persistenceSnapshotWeight;
- recoveredLocatorPreparer = init.recoveredLocatorPreparer;
- inRecovery = init.inRecovery;
- activationID = init.activationID;
- activationSystem = init.activationSystem;
- serverExporter = init.serverExporter;
- lookupGroups = init.lookupGroups;
- lookupLocators = init.lookupLocators;
- memberGroups = init.memberGroups;
- unicastPort = init.unicastPort;
- lookupAttrs = init.lookupAttrs;
- discoer = init.discoer;
- listenerPreparer = init.listenerPreparer;
- locatorPreparer = init.locatorPreparer;
- minMaxEventLease = init.minMaxEventLease;
- minMaxServiceLease = init.minMaxServiceLease;
- minRenewalInterval = init.minRenewalInterval;
- multicastAnnouncementInterval = init.multicastAnnouncementInterval;
- multicastInterfaceRetryInterval = init.multicastInterfaceRetryInterval;
- multicastInterfaces = init.multicastInterfaces;
- multicastInterfacesSpecified = init.multicastInterfacesSpecified;
- resourceIdGenerator = init.resourceIdGenerator;
- serviceIdGenerator = init.serviceIdGenerator;
- unexportTimeout = init.unexportTimeout;
- unexportWait = init.unexportWait;
- objectServiceType = init.objectServiceType;
- unicastDiscoverySubjectChecker = init.unicastDiscoverySubjectChecker;
- protocol2 = init.protocol2;
- rawUnicastDiscoveryConstraints = init.rawUnicastDiscoveryConstraints;
- multicastRequestConstraints = init.multicastRequestConstraints;
- multicastAnnouncementConstraints = init.multicastAnnouncementConstraints;
- unicastDiscoveryConstraints = init.unicastDiscoveryConstraints;
- context = init.context;
- eventNotifierExec = new SynchronousExecutors(init.scheduledExecutor);
- eventTaskMap = new TreeMap<EventReg,ExecutorService>();
- discoveryResponseExec = init.executor;
- ReliableLog log = null;
- Thread serviceExpirer = null;
- Thread eventExpirer = null;
- Thread unicaster = null;
- Thread multicaster = null;
- Thread announcer = null;
- Thread snapshotter = null;
-
- try {
- // Create threads with correct login context.
- List<Thread> threads = AccessController.doPrivileged(new PrivilegedExceptionAction<List<Thread>>(){
-
- @Override
- public List<Thread> run() throws Exception {
- Thread t;
- List<Thread> list = new ArrayList<Thread>(6);
- list.add(newDaemonThread(new ServiceExpire(RegistrarImpl.this), "service expire"));
- list.add(newDaemonThread(new EventExpire(RegistrarImpl.this),"event expire"));
- unicast = new Unicast(RegistrarImpl.this, unicastPort);
- list.add(newInterruptStatusThread(unicast, "unicast request"));
- list.add(newInterruptStatusThread(new Multicast(RegistrarImpl.this), "multicast request"));
- list.add(newDaemonThread(new Announce(RegistrarImpl.this),"discovery announcement"));
- list.add(newDaemonThread(new Snapshot(RegistrarImpl.this),"snapshot thread"));
- return list;
- }
-
- private Thread newDaemonThread(Runnable r, String name){
- Thread t = new Thread(r,name);
- t.setDaemon(true);
- return t;
- }
-
- private Thread newInterruptStatusThread(Runnable r, String name){
- Thread t = new InterruptedStatusThread(r,name);
- t.setDaemon(true);
- return t;
- }
-
- }, context);
- serviceExpirer = threads.get(0);
- eventExpirer = threads.get(1);
- unicaster = threads.get(2);
- multicaster = threads.get(3);
- announcer = threads.get(4);
- snapshotter = threads.get(5);
- if (init.persistent){
- log = new ReliableLog(init.persistenceDirectory, new LocalLogHandler(this));
- if (logger.isLoggable(Level.CONFIG)) {
- logger.log(Level.CONFIG, "using persistence directory {0}",
- new Object[]{ init.persistenceDirectory });
- }
- } else {
- log = null;
- }
-
- constructionException = null;
- } catch (PrivilegedActionException ex) {
- constructionException = ex.getException();
- } catch (IOException ex) {
- constructionException = ex;
- } finally {
- this.log = log;
- this.serviceExpirer = serviceExpirer;
- this.eventExpirer = eventExpirer;
- this.unicaster = unicaster;
- this.multicaster = multicaster;
- this.announcer = announcer;
- this.snapshotter = snapshotter;
- }
- multicastRequestSubjectChecker = init.multicastRequestSubjectChecker;
- loginContext = init.loginContext;
- unicastDiscoveryHost = init.unicastDiscoveryHost;
- config = init.config;
- }
-
- /** A service item registration record. */
- private final static class SvcReg implements Comparable, Serializable {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The service item.
- *
- * @serial
- */
- public final Item item;
- /**
- * The lease id.
- *
- * @serial
- */
- public final Uuid leaseID;
- /**
- * The lease expiration time.
- *
- * @serial
- */
- public volatile long leaseExpiration;
-
- /** Simple constructor */
- public SvcReg(Item item, Uuid leaseID, long leaseExpiration) {
- this.item = item;
- this.leaseID = leaseID;
- this.leaseExpiration = leaseExpiration;
- }
-
- /**
- * Primary sort by leaseExpiration, secondary by leaseID. The
- * secondary sort is immaterial, except to ensure a total order
- * (required by TreeMap).
- */
- public int compareTo(Object obj) {
- SvcReg reg = (SvcReg)obj;
- if (this == reg)
- return 0;
- int i = compare(leaseExpiration, reg.leaseExpiration);
- if (i != 0) {
- return i;
- }
- i = compare(leaseID.getMostSignificantBits(),
- reg.leaseID.getMostSignificantBits());
- if (i != 0) {
- return i;
- }
- return compare(leaseID.getLeastSignificantBits(),
- reg.leaseID.getLeastSignificantBits());
- }
-
- /**
- * Compares long values, returning -1, 0, or 1 if l1 is less than,
- * equal to or greater than l2, respectively.
- */
- private static int compare(long l1, long l2) {
- return (l1 < l2) ? -1 : ((l1 > l2) ? 1 : 0);
- }
- }
-
- /** An event registration record. */
- private final static class EventReg implements Comparable, Serializable {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The event id.
- * @serial
- */
- public final long eventID;
- /**
- * The lease id.
- * @serial
- */
- public final Uuid leaseID;
- /**
- * The template to match.
- * @serial
- */
- public final Template tmpl;
- /**
- * The transitions.
- *
- * @serial
- */
- public final int transitions;
- /**
- * The current sequence number.
- *
- * @serial
- */
- public long seqNo;
- /**
- * The event listener.
- */
- public transient RemoteEventListener listener;
- /**
- * The handback object.
- *
- * @serial
- */
- public final MarshalledObject handback;
- /**
- * The lease expiration time.
- *
- * @serial
- */
- private long leaseExpiration;
-
- /** Simple constructor */
- public EventReg(long eventID, Uuid leaseID, Template tmpl,
- int transitions, RemoteEventListener listener,
- MarshalledObject handback, long leaseExpiration) {
- this.eventID = eventID;
- this.leaseID = leaseID;
- this.tmpl = tmpl;
- this.transitions = transitions;
- this.seqNo = 0;
- this.listener = listener;
- this.handback = handback;
- this.leaseExpiration = leaseExpiration;
- }
-
- synchronized long incrementSeqNo(long increment){
- seqNo += increment;
- return seqNo;
- }
-
- synchronized long incrementAndGetSeqNo(){
- return ++seqNo;
- }
-
- synchronized long getSeqNo(){
- return seqNo;
- }
-
- /**
- * Primary sort by leaseExpiration, secondary by eventID. The
- * secondary sort is immaterial, except to ensure a total order
- * (required by TreeMap).
- */
- public int compareTo(Object obj) {
- EventReg reg = (EventReg)obj;
- if (this == reg)
- return 0;
- if (getLeaseExpiration() < reg.getLeaseExpiration() ||
- (getLeaseExpiration() == reg.getLeaseExpiration() &&
- eventID < reg.eventID))
- return -1;
- return 1;
- }
-
- /**
- * Prepares listener (if non-null) using the given proxy preparer. If
- * preparation fails, the listener field is set to null.
- */
- public void prepareListener(ProxyPreparer preparer) {
- if (listener != null) {
- try {
- listener =
- (RemoteEventListener) preparer.prepareProxy(listener);
- } catch (Exception e) {
- if (logger.isLoggable(Level.WARNING)) {
- logThrow(
- Level.WARNING,
- getClass().getName(),
- "prepareListener",
- "failed to prepare event listener {0}",
- new Object[]{ listener },
- e);
- }
- listener = null;
- }
- }
- }
-
- /**
- * @serialData RemoteEventListener as a MarshalledInstance
- */
- private void writeObject(ObjectOutputStream stream)
- throws IOException
- {
- stream.defaultWriteObject();
- stream.writeObject(new MarshalledInstance(listener));
- }
-
- /**
- * Unmarshals the event listener.
- */
- private void readObject(ObjectInputStream stream)
- throws IOException, ClassNotFoundException
- {
- stream.defaultReadObject();
- MarshalledInstance mi = (MarshalledInstance) stream.readObject();
- try {
- listener = (RemoteEventListener) mi.get(false);
- } catch (Throwable e) {
- if (e instanceof Error &&
- ThrowableConstants.retryable(e) ==
- ThrowableConstants.BAD_OBJECT)
- {
- throw (Error) e;
- }
- logger.log(Level.WARNING,
- "failed to recover event listener", e);
- }
- }
-
- /**
- * @return the leaseExpiration
- */
- synchronized long getLeaseExpiration() {
- return leaseExpiration;
- }
-
- /**
- * @param leaseExpiration the leaseExpiration to set
- */
- synchronized void setLeaseExpiration(long leaseExpiration) {
- this.leaseExpiration = leaseExpiration;
- }
- }
-
- /**
- * Interface defining the method(s) that must be implemented by each of
- * the concrete LogObj classes. This allows for the definition of
- * object-dependent invocations of the appropriate implementation of
- * the method(s) declared in this interface.
- */
- private static interface LogRecord extends Serializable {
- void apply(RegistrarImpl regImpl);
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * a new service is registered.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class SvcRegisteredLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The service registration.
- *
- * @serial
- */
- private final SvcReg reg;
-
- /** Simple constructor */
- public SvcRegisteredLogObj(SvcReg reg) {
- this.reg = reg;
- }
-
- /**
- * Modifies the state of the Registrar by registering the service
- * stored in the reg object. Also needs to delete any existing
- * service with the same serviceID; this can happen if a service
- * re-registers while an existing registration is in effect, because
- * we don't log a separate lease cancellation record for the existing
- * registration in that case.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.concurrentObj.writeLock();
- try {
- SvcReg oldReg =
- (SvcReg)regImpl.serviceByID.get(reg.item.serviceID);
- if (oldReg != null)
- regImpl.deleteService(oldReg, 0);
- regImpl.addService(reg);
- } finally {
- regImpl.concurrentObj.writeUnlock();
- }
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * new attributes are added to an existing service in the Registrar.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class AttrsAddedLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The service id.
- *
- * @serial
- */
- private final ServiceID serviceID;
- /**
- * The lease id.
- *
- * @serial
- */
- private final Uuid leaseID;
- /**
- * The attributes added.
- *
- * @serial
- */
- private final EntryRep[] attrSets;
-
- /** Simple constructor */
- public AttrsAddedLogObj(ServiceID serviceID,
- Uuid leaseID,
- EntryRep[] attrSets)
- {
- this.serviceID = serviceID;
- this.leaseID = leaseID;
- this.attrSets = attrSets;
- }
-
- /**
- * Modifies the state of the Registrar by adding to all of the
- * services matching the template, the attributes stored in
- * attributeSets.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.concurrentObj.writeLock();
- try {
- regImpl.addAttributesDo(serviceID, leaseID, attrSets);
- } catch (UnknownLeaseException e) {
- /* this exception should never occur when recovering */
- throw new AssertionError("an UnknownLeaseException should"
- + " never occur during recovery");
- } finally {
- regImpl.concurrentObj.writeUnlock();
- }
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * existing attributes of an existing service in the Registrar are
- * modified.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class AttrsModifiedLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The service id.
- *
- * @serial
- */
- private final ServiceID serviceID;
- /**
- * The lease id.
- *
- * @serial
- */
- private final Uuid leaseID;
- /**
- * The templates to match.
- * @serial
- */
- private final EntryRep[] attrSetTmpls;
- /**
- * The new attributes.
- *
- * @serial
- */
- private final EntryRep[] attrSets;
-
- /** Simple constructor */
- public AttrsModifiedLogObj(ServiceID serviceID,
- Uuid leaseID,
- EntryRep[] attrSetTmpls,
- EntryRep[] attrSets)
- {
- this.serviceID = serviceID;
- this.leaseID = leaseID;
- this.attrSetTmpls = attrSetTmpls;
- this.attrSets = attrSets;
- }
-
- /**
- * Modifies the state of the Registrar by modifying the attributes
- * of the services that match the template with the attributes
- * stored in attributeSets.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.concurrentObj.writeLock();
- try {
- regImpl.modifyAttributesDo(serviceID, leaseID,
- attrSetTmpls, attrSets);
- } catch (UnknownLeaseException e) {
- /* this exception should never occur when recovering */
- throw new AssertionError("an UnknownLeaseException should"
- + " never occur during recovery");
- } finally {
- regImpl.concurrentObj.writeUnlock();
- }
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * new attributes are set on an existing service in the Registrar.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class AttrsSetLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The service id.
- *
- * @serial
- */
- private final ServiceID serviceID;
- /**
- * The lease id.
- *
- * @serial
- */
- private final Uuid leaseID;
- /**
- * The new attributes.
- *
- * @serial
- */
- private final EntryRep[] attrSets;
-
- /** Simple constructor */
- public AttrsSetLogObj(ServiceID serviceID,
- Uuid leaseID,
- EntryRep[] attrSets)
- {
- this.serviceID = serviceID;
- this.leaseID = leaseID;
- this.attrSets = attrSets;
- }
-
- /**
- * Modifies the state of the Registrar by replacing the attributes
- * of the services matching the template with the attributes stored
- * in attributeSets.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.concurrentObj.writeLock();
- try {
- regImpl.setAttributesDo(serviceID, leaseID, attrSets);
- } catch (UnknownLeaseException e) {
- /* this exception should never occur when recovering */
- } finally {
- regImpl.concurrentObj.writeUnlock();
- }
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * a new event is registered.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class EventRegisteredLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The event registration.
- *
- * @serial
- */
- private final EventReg eventReg;
-
- /** Simple constructor */
- public EventRegisteredLogObj(EventReg eventReg) {
- this.eventReg = eventReg;
- }
-
- /**
- * Modifies the state of the Registrar by registering the event
- * stored in the eventReg object; and by updating both the event
- * sequence number and the event ID.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- synchronized (eventReg){ // Atomic
- eventReg.prepareListener(regImpl.recoveredListenerPreparer);
- eventReg.incrementSeqNo(Integer.MAX_VALUE);
- }
- regImpl.concurrentObj.writeLock();
- try{
- regImpl.addEvent(eventReg);
- regImpl.eventID++;
- } finally {
- regImpl.concurrentObj.writeUnlock();
- }
- }
- }
-
-
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * a lease on an existing service in the Registrar is cancelled.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class ServiceLeaseCancelledLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The service id.
- *
- * @serial
- */
- private final ServiceID serviceID;
- /**
- * The lease id.
- *
- * @serial
- */
- private final Uuid leaseID;
-
- /** Simple constructor */
- public ServiceLeaseCancelledLogObj(ServiceID serviceID, Uuid leaseID) {
- this.serviceID = serviceID;
- this.leaseID = leaseID;
- }
-
- /**
- * Modifies the state of the Registrar by cancelling the lease
- * having ID equal to the contents of the leaseID field; and
- * corresponding to the service with ID equal to the contents of
- * the serviceID field.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.concurrentObj.writeLock();
- try {
- regImpl.cancelServiceLeaseDo(serviceID, leaseID);
- } catch (UnknownLeaseException e) {
- /* this exception should never occur when recovering */
- } finally {
- regImpl.concurrentObj.writeUnlock();
- }
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * a lease on an existing service in the Registrar is renewed.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class ServiceLeaseRenewedLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The service id.
- *
- * @serial
- */
- private final ServiceID serviceID;
- /**
- * The lease id.
- *
- * @serial
- */
- private final Uuid leaseID;
- /**
- * The new lease expiration time.
- *
- * @serial
- */
- private final long leaseExpTime;
-
- /** Simple constructor */
- public ServiceLeaseRenewedLogObj(ServiceID serviceID,
- Uuid leaseID,
- long leaseExpTime)
- {
- this.serviceID = serviceID;
- this.leaseID = leaseID;
- this.leaseExpTime = leaseExpTime;
- }
-
- /**
- * Modifies the state of the Registrar by renewing the lease
- * having ID equal to the contents of the leaseID field; and
- * corresponding to the service with ID equal to the contents
- * of the serviceID field.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.renewServiceLeaseAbs(serviceID, leaseID, leaseExpTime);
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * a lease on a registered event is cancelled.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class EventLeaseCancelledLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The event id.
- *
- * @serial
- */
- private final long eventID;
- /**
- * The lease id.
- *
- * @serial
- */
- private final Uuid leaseID;
-
- /** Simple constructor */
- public EventLeaseCancelledLogObj(long eventID, Uuid leaseID) {
- this.eventID = eventID;
- this.leaseID = leaseID;
- }
-
- /**
- * Modifies the state of the Registrar by cancelling the lease
- * corresponding to the event with ID equal to the contents of
- * the eventID field.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.concurrentObj.writeLock();
- try {
- regImpl.cancelEventLeaseDo(eventID, leaseID);
- } catch (UnknownLeaseException e) {
- /* this exception should never occur when recovering */
- } finally {
- regImpl.concurrentObj.writeUnlock();
- }
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * a lease on a registered event is renewed.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class EventLeaseRenewedLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The event id.
- *
- * @serial
- */
- private final long eventID;
- /**
- * The lease id.
- *
- * @serial
- */
- private final Uuid leaseID;
- /**
- * The new lease expiration time.
- *
- * @serial
- */
- private final long leaseExpTime;
-
- /** Simple constructor */
- public EventLeaseRenewedLogObj(long eventID,
- Uuid leaseID,
- long leaseExpTime)
- {
- this.eventID = eventID;
- this.leaseID = leaseID;
- this.leaseExpTime = leaseExpTime;
- }
-
- /**
- * Modifies the state of the Registrar by renewing the lease
- * corresponding to the event with ID equal to the contents of
- * the eventID field.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.renewEventLeaseAbs(eventID, leaseID, leaseExpTime);
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * a leases in the Registrar is renewed via a LeaseMap.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class LeasesRenewedLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The service and event ids.
- *
- * @serial
- */
- private final Object[] regIDs;
- /**
- * The lease ids.
- *
- * @serial
- */
- private final Uuid[] leaseIDs;
- /**
- * The new lease expiration times.
- *
- * @serial
- */
- private final long[] leaseExpTimes;
-
- /** Simple constructor */
- public LeasesRenewedLogObj(Object[] regIDs,
- Uuid[] leaseIDs,
- long[] leaseExpTimes)
- {
- this.regIDs = regIDs;
- this.leaseIDs = leaseIDs;
- this.leaseExpTimes = leaseExpTimes;
- }
-
- /**
- * Modifies the state of the Registrar by renewing the specified
- * leases.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.renewLeasesAbs(regIDs, leaseIDs, leaseExpTimes);
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * lease are cancelled via a LeaseMap.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class LeasesCancelledLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The service and event ids.
- *
- * @serial
- */
- private final Object[] regIDs;
- /**
- * The lease ids.
- *
- * @serial
- */
- private final Uuid[] leaseIDs;
-
- /** Simple constructor */
- public LeasesCancelledLogObj(Object[] regIDs, Uuid[] leaseIDs) {
- this.regIDs = regIDs;
- this.leaseIDs = leaseIDs;
- }
-
- /**
- * Modifies the state of the Registrar by cancelling the specified
- * leases.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- /* Exceptions can be returned, since we didn't weed out unknown
- * leases before logging, but we can just ignore them anyway.
- */
- regImpl.concurrentObj.writeLock();
- try {
- regImpl.cancelLeasesDo(regIDs, leaseIDs);
- } finally {
- regImpl.concurrentObj.writeUnlock();
- }
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * the Unicast Port Number is set to a new value.
- * <p>
- * Note: the apply() method of this class merely sets the private field
- * unicastPort. This means that during a recovery, the unicaster
- * thread will be created with this new port number ONLY IF that
- * thread is created AFTER recovery is complete. Thus, it is
- * important that at re-initialization during a re-activation
- * of the Registrar, the recovery() method is invoked before
- * the unicaster thread is created.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class UnicastPortSetLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The new port number.
- *
- * @serial
- */
- private final int newPort;
-
- /** Simple constructor */
- public UnicastPortSetLogObj(int newPort) {
- this.newPort = newPort;
- }
-
- /**
- * Modifies the state of the Registrar by setting the value of the
- * private unicastPort field to the value of the newPort field.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.unicastPort = newPort;
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * the set of groups to join is changed.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class LookupGroupsChangedLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The new groups to join.
- *
- * @serial
- */
- private final String[] groups;
-
- /** Simple constructor */
- public LookupGroupsChangedLogObj(String[] groups) {
- this.groups = groups;
- }
-
- /**
- * Modifies the state of the Registrar by setting the private
- * field lookupGroups to the reference to the groups field.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.lookupGroups = groups;
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * the set of locators of lookup services to join is changed.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class LookupLocatorsChangedLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The new locators to join.
- */
- private transient LookupLocator[] locators;
-
- /** Simple constructor */
- public LookupLocatorsChangedLogObj(LookupLocator[] locators) {
- this.locators = locators;
- }
-
- /**
- * Modifies the state of the Registrar by setting the private
- * field lookupLocators to the reference to the locators field.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- try {
- regImpl.lookupLocators = prepareLocators(
- locators, regImpl.recoveredLocatorPreparer, true);
- } catch (RemoteException e) {
- throw new AssertionError(e);
- }
- }
-
- /**
- * Writes locators as a null-terminated list of MarshalledInstances.
- */
- private void writeObject(ObjectOutputStream stream)
- throws IOException
- {
- stream.defaultWriteObject();
- marshalLocators(locators, stream);
- }
-
- /**
- * Reads in null-terminated list of MarshalledInstances, from which
- * locators are unmarshalled.
- */
- private void readObject(ObjectInputStream stream)
- throws IOException, ClassNotFoundException
- {
- stream.defaultReadObject();
- locators = unmarshalLocators(stream);
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * the memberGroups array is set to reference a new array of strings.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class MemberGroupsChangedLogObj implements LogRecord {
-
- private static final long serialVersionUID = 2L;
-
- /**
- * The new groups to be a member of.
- *
- * @serial
- */
- private final String[] groups;
-
- /** Simple constructor */
- public MemberGroupsChangedLogObj(String[] groups) {
- this.groups = groups;
- }
-
- /**
- * Modifies the state of the Registrar by setting the private
- * memberGroups field to the reference to the groups field.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.concurrentObj.writeLock();
- try {
- regImpl.memberGroups = groups;
- } finally {
- regImpl.concurrentObj.writeUnlock();
- }
- }
- }
-
- /**
- * LogObj class whose instances are recorded to the log file whenever
- * the attributes for the lookup service are changed.
- *
- * @see RegistrarImpl.LocalLogHandler
- */
- private static class LookupAttributesChangedLogObj implements LogRecord {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * The new lookup service attributes.
- */
- private transient Entry[] attrs;
-
- /** Simple constructor */
- public LookupAttributesChangedLogObj(Entry[] attrs) {
- this.attrs = attrs;
- }
-
- /**
- * Modifies the state of the Registrar by setting the private
- * field lookupAttrs to the reference to the attrs field.
- *
- * @see RegistrarImpl.LocalLogHandler#applyUpdate
- */
- public void apply(RegistrarImpl regImpl) {
- regImpl.lookupAttrs = attrs;
- }
-
- /**
- * Writes attributes as a null-terminated list of MarshalledInstances.
- */
- private void writeObject(ObjectOutputStream stream)
- throws IOException
- {
- stream.defaultWriteObject();
- marshalAttributes(attrs, stream);
- }
-
- /**
- * Reads in null-terminated list of MarshalledInstances, from which
- * attributes are unmarshalled.
- */
- private void readObject(ObjectInputStream stream)
- throws IOException, ClassNotFoundException
- {
- stream.defaultReadObject();
- attrs = unmarshalAttributes(stream);
- }
- }
-
- /**
- * Handler class for the persistent storage facility.
- * <p>
- * At any point during processing in a persistent Registrar instance, there
- * will exist both a 'snapshot' of the Registrar's state and a set of
- * records detailing each significant change that has occurred to the state
- * since the snapshot was taken. The snapshot information and the
- * incremental change information will be stored in separate files called,
- * respectively, the snapshot file and the log file. Together, these files
- * are used to recover the state of the Registrar when it is restarted or
- * reactivated (for example, after a crash or network outage).
- * <p>
- * This class contains the methods that are used to record and recover
- * the snapshot of the Registrar's state; as well as the method used to
- * apply the state changes that were recorded in the log file.
- * <p>
- * When the ReliableLog class is instantiated, a new instance of this
- * class is passed to its constructor so that the methods of this
- * class may be invoked by the methods defined in the ReliableLog.
- * Because this class extends the LogHandler class associated with
- * the ReliableLog class, this class must provide implementations of
- * the abstract methods declared in the LogHandler. Also, some of the
- * methods defined in this class override the methods of the LogHandler
- * in order to customize the handling of snapshot creation and
- * retrieval.
- * <p>
- * Each significant change to the persistent Registrar's state is written
- * to the log file as an individual record (when addLogRecord() is
- * invoked). After the number of records logged exceeds a pre-defined
- * threshold, a snapshot of the state is recorded by invoking -- through
- * the ReliableLog and its LogHandler -- the snapshot() method defined in
- * this class. After the snapshot is taken, the log file is cleared and the
- * incremental log process starts over.
- * <p>
- * The contents of the snapshot file reflect the DATA contained in
- * the fields making up the current state of the Registrar. That data
- * represents many changes -- over time -- to the Registrar's state.
- * On the other hand, each record written to the log file is an object
- * that reflects both the data used and the ACTIONS taken to make one
- * change to the Registrar's state at a particular point in time.
- * <p>
- * The data written to the snapshot file is shown below:
- * <ul>
- * <li> our service ID
- * <li> current event ID
- * <li> current values of administrable parameters
- * <li> contents of the container holding the current registered services
- * <li> null (termination 'marker' for the set of registered services)
- * <li> contents of the container holding the current registered events
- * <li> null (termination 'marker' for the set of registered events)
- * </ul>
- * The type of state changes that will generate a new record in the log
- * file are:
- * <ul>
- * <li> a new service was registered
- * <li> a new event was registered
- * <li> new attributes were added to an existing service
- * <li> existing attributes of a service were modified
- * <li> a service lease was cancelled
- * <li> a service lease was renewed
- * <li> an event lease was cancelled
- * <li> an event lease was renewed
- * <li> an administrable parameter was changed
- * </ul>
- * During recovery, the state of the Registrar at the time of a crash
- * or outage is re-constructed by first retrieving the 'base' state from
- * the snapshot file; and then modifying that base state according to
- * the records retrieved from the log file. The reconstruction of the
- * base state is achieved by invoking the recover() method defined in
- * this class. The modifications recorded in the log file are then
- * applied to the base state by invoking the applyUpdate() method
- * defined in this class. Both recover() and applyUpdate() are invoked
- * through the ReliableLog and its associated LogHandler.
- * <p>
- * NOTE: The following lines must be added to the Registrar's policy file
- * <pre>
- * permission java.io.FilePermission "dirname", "read,write,delete";
- * permission java.io.FilePermission "dirname/-", "read,write,delete";
- * </pre>
- * where 'dirname' is the name of the directory path (relative or
- * absolute) where the snapshot and log file will be maintained.
- */
- private static class LocalLogHandler extends LogHandler {
- private final RegistrarImpl reggie;
- /** Simple constructor */
- public LocalLogHandler(RegistrarImpl reggie) {
- this.reggie = reggie;
- }
-
- /* Overrides snapshot() defined in ReliableLog's LogHandler class. */
- public void snapshot(OutputStream out) throws IOException {
- reggie.concurrentObj.readLock();
- try {
- reggie.takeSnapshot(out);
- } finally {
- reggie.concurrentObj.readUnlock();
- }
- }
-
- /* Overrides recover() defined in ReliableLog's LogHandler class. */
- public void recover(InputStream in)
- throws IOException, ClassNotFoundException
- {
- reggie.concurrentObj.writeLock();
- try {
- reggie.recoverSnapshot(in);
- } finally {
- reggie.concurrentObj.writeUnlock();
- }
- }
-
- /**
- * Required method implementing the abstract applyUpdate()
- * defined in ReliableLog's associated LogHandler class.
- * <p>
- * During state recovery, the recover() method defined in the
- * ReliableLog class is invoked. That method invokes the method
- * recoverUpdates() which invokes the method readUpdates(). Both
- * of those methods are defined in ReliableLog. The method
- * readUpdates() retrieves a record from the log file and then
- * invokes this method.
- * <p>
- * This method invokes the version of the method apply() that
- * corresponds to the particular type of 'log record' object
- * that is input as the first argument. The log record object and its
- * corresponding apply() method are defined in one of the so-called
- * LogObj classes. Any instance of one the LogObj classes is an
- * implementation of the LogRecord interface. The particular
- * implementation that is input to this method is dependent on the
- * type of record that was originally logged. The apply() method
- * will then modify the state of the Registrar in a way dictated
- * by the type of record that was retrieved.
- */
- public void applyUpdate(Object logRecObj) {
- ((LogRecord)logRecObj).apply(reggie);
- }
- }
-
- /** Base class for iterating over all Items that match a Template. */
- private static abstract class ItemIter {
- /** Current time */
- public final long now = System.currentTimeMillis();
- /** True means duplicate items are possible */
- public boolean dupsPossible = false;
- /** Template to match */
- protected final Template tmpl;
- /** Next item to return */
- protected SvcReg reg;
-
- /** Subclass constructors must initialize reg */
- protected ItemIter(Template tmpl) {
- this.tmpl = tmpl;
- }
-
- /** Returns true if the iteration has more elements. */
- public boolean hasNext() {
- return reg != null;
- }
-
- /** Returns the next element in the iteration as an Item. */
- public Item next() {
- if (reg == null)
- throw new NoSuchElementException();
- Item item = reg.item;
- step();
- return item;
- }
-
- /** Returns the next element in the iteration as a SvcReg. */
- public SvcReg nextReg() {
- if (reg == null)
- throw new NoSuchElementException();
- SvcReg cur = reg;
- step();
- return cur;
- }
-
- /** Set reg to the next matching element, or null if none */
- protected abstract void step();
- }
-
- /** Iterate over all Items. */
- private static class AllItemIter extends ItemIter {
- /** Iterator over serviceByID */
- private final Iterator<SvcReg> iter;
-
- /** Assumes the empty template */
- public AllItemIter(Iterator<SvcReg> it) {
- super(null);
- iter = it;
- step();
- }
-
- /** Set reg to the next matching element, or null if none */
- @Override
- protected void step() {
- while (iter.hasNext()) {
- reg = (SvcReg)iter.next();
- if (reg.leaseExpiration > now)
- return;
- }
- reg = null;
- }
- }
-
- /** Iterates over all services that match template's service types */
- private static class SvcIterator extends ItemIter {
- /** Iterator for list of matching services. */
- private final Iterator<SvcReg> services;
-
- /**
- * tmpl.serviceID == null and
- * tmpl.serviceTypes is not empty
- */
- public SvcIterator(Template tmpl, Iterator<SvcReg> it) {
- super(tmpl);
- services = it;
- step();
- }
-
- /** Set reg to the next matching element, or null if none. */
- protected final void step() {
- if (tmpl.serviceTypes.length > 1) {
- while (services.hasNext()) {
- reg = (SvcReg) services.next();
- if (reg.leaseExpiration > now &&
- matchType(tmpl.serviceTypes, reg.item.serviceType) &&
- matchAttributes(tmpl, reg.item))
- return;
- }
- } else {
- while (services.hasNext()) {
- reg = (SvcReg) services.next();
- if (reg.leaseExpiration > now &&
- matchAttributes(tmpl, reg.item))
- return;
- }
- }
- reg = null;
- }
- }
-
- /** Iterate over all matching Items by attribute value. */
- private static class AttrItemIter extends ItemIter {
- /** SvcRegs obtained from serviceByAttr for chosen attr */
- private final List<SvcReg> svcs;
- /** Current index into svcs */
- private int svcidx;
-
- /**
- * tmpl.serviceID == null and
- * tmpl.serviceTypes is empty and
- * tmpl.attributeSetTemplates[setidx].fields[fldidx] != null
- */
- public AttrItemIter(Template tmpl, List<SvcReg> svcs) {
- super(tmpl);
- this.svcs = svcs;
- if (svcs != null) {
- svcidx = svcs.size();
- step();
- }
- }
-
- /** Set reg to the next matching element, or null if none. */
- protected void step() {
- while (--svcidx >= 0) {
- reg = svcs.get(svcidx);
- if (reg.leaseExpiration > now
- && matchAttributes(tmpl, reg.item)) {
- return;
- }
- }
- reg = null;
- }
- }
-
- /** Iterate over all matching Items by entry class, dups possible. */
- private class ClassItemIter extends ItemIter {
- /** Entry class to match on */
- private final EntryClass eclass;
- /** Current index into entryClasses */
- private int classidx;
- /** Values iterator for current HashMap */
- private Iterator<List<SvcReg>> iter;
- /** SvcRegs obtained from iter or serviceByEmptyAttr */
- private List<SvcReg> svcs;
- /** Current index into svcs */
- private int svcidx = 0;
-
- /**
- * tmpl.serviceID == null and
- * tmpl.serviceTypes is empty and
- * tmpl.attributeSetTemplates is non-empty
- */
- public ClassItemIter(Template tmpl) {
- super(tmpl);
- dupsPossible = true;
- eclass = tmpl.attributeSetTemplates[0].eclass;
- classidx = entryClasses.size();
- step();
- }
-
- /** Set reg to the next matching element, or null if none */
- protected void step() {
- do {
- while (--svcidx >= 0) {
- reg = (SvcReg)svcs.get(svcidx);
- if (reg.leaseExpiration > now &&
- matchAttributes(tmpl, reg.item))
- return;
- }
- } while (stepValue());
- reg = null;
- }
-
- /**
- * Step to the next HashMap value, if any, reset svcs and svcidx,
- * and return false if everything exhausted.
- */
- private boolean stepValue() {
- while (true) {
- if (iter != null && iter.hasNext()) {
- svcs = (ArrayList)iter.next();
- svcidx = svcs.size();
- return true;
- }
- if (!stepClass())
- return false;
- if (iter == null)
- return true;
- }
- }
-
- /**
- * Step to the next matching entry class, if any, reset iter
- * using the HashMap for the last field of the class (and reset
- * (svcs and svcidx if the entry class has no fields), and
- * return false if everything exhausted.
- */
- private boolean stepClass() {
- while (--classidx >= 0) {
- EntryClass cand = entryClasses.get(classidx);
- if (!eclass.isAssignableFrom(cand))
- continue;
- if (cand.getNumFields() > 0) {
- cand = getDefiningClass(cand, cand.getNumFields() - 1);
- Map<Object,List<SvcReg>>[] attrMaps = serviceByAttr.get(cand);
- iter = attrMaps[attrMaps.length - 1].values().iterator();
- } else {
- iter = null;
- svcs = serviceByEmptyAttr.get(cand);
- svcidx = svcs.size();
- }
- return true;
- }
- return false;
- }
- }
-
- /** Iterate over a singleton matching Item by serviceID. */
- private static class IDItemIter extends ItemIter {
-
- /** tmpl.serviceID != null */
- public IDItemIter(Template tmpl, SvcReg reg) {
- super(tmpl);
- if (reg != null &&
- (reg.leaseExpiration <= now || !matchItem(tmpl, reg.item))) {
- reg = null;
- }
- this.reg = reg;
- }
-
- /** Set reg to null */
- protected void step() {
- reg = null;
- }
- }
-
- /** An event to be sent, and the listener to send it to. */
- private static final class EventTask implements Callable<Boolean>, Comparable<EventTask> {
-
- /** The event registration */
- private final EventReg reg;
- /** The sequence number of this event */
- private final long seqNo;
- /** The service id */
- private final ServiceID sid;
- /** The new state of the item, or null if deleted */
- private final Item item;
- /** The transition that fired */
- private final int transition;
-
- private final RegistrarProxy proxy;
- private final Registrar registrar;
- /* the time of the event */
- private final long now;
-
- /** Simple constructor, except increments reg.seqNo. */
- public EventTask(EventReg reg, ServiceID sid, Item item, int transition, RegistrarProxy proxy, Registrar registrar, long now)
- {
- this.reg = reg;
- seqNo = reg.incrementAndGetSeqNo();
- this.sid = sid;
- this.item = item;
- this.transition = transition;
- this.proxy = proxy;
- this.registrar = registrar;
- this.now = now;
- }
-
- /** Send the event */
- public Boolean call() throws Exception {
- if (logger.isLoggable(Level.FINE)) {
- logger.log(
- Level.FINE,
- "notifying listener {0} of event {1}",
- new Object[]{ reg.listener, Long.valueOf(reg.eventID) });
- }
- try {
- reg.listener.notify(new RegistrarEvent(proxy, reg.eventID,
- seqNo, reg.handback,
- sid, transition, item));
- return Boolean.TRUE;
- } catch (Throwable e) {
- switch (ThrowableConstants.retryable(e)) {
- case ThrowableConstants.BAD_OBJECT:
- if (e instanceof Error) {
- logger.log(
- Levels.HANDLED, "exception sending event", e);
- throw (Error) e;
- }
- case ThrowableConstants.BAD_INVOCATION:
- case ThrowableConstants.UNCATEGORIZED:
- /* If the listener throws UnknownEvent or some other
- * definite exception, we can cancel the lease.
- */
- logger.log(Level.INFO, "exception sending event", e);
- try {
- registrar.cancelEventLease(reg.eventID, reg.leaseID);
- } catch (UnknownLeaseException ee) {
- logger.log(
- Levels.HANDLED,
- "exception canceling event lease",
- e);
- } catch (RemoteException ee) {
- logger.log(
- Levels.HANDLED,
- "The server has been shutdown",
- e);
- }
- }
- return Boolean.FALSE;
- }
- }
-
- /** Keep events going to the same listener ordered.
- * Returns a positive integer if depends, 0 if
- * no dependency exists and a negative integer if
- * precedes.
- */
- public boolean dependsOn(EventTask obj) {
- if (obj == null) return false;
- if (reg.listener.equals(obj.reg.listener)) return true;
- return false;
- }
-
- /**
- * This is inconsistent with Object.equals, it is simply intended to
- * order tasks by priority.
- * @param o
- * @return
- */
- @Override
- public int compareTo(EventTask o) {
- if (this.now < o.now) return -1;
- if (this.now > o.now) return 1;
- return 0;
- }
- }
-
- /** Task for decoding multicast request packets. */
- private static final class DecodeRequestTask implements Runnable {
- /** The multicast packet to decode */
- private final DatagramPacket datagram;
- /** The decoder for parsing the packet */
- private final Discovery decoder;
- private final RegistrarImpl reggie;
- /* Ensures that no duplicate AddressTask is running */
- private final Set<AddressTask> runningTasks;
-
- public DecodeRequestTask(
- DatagramPacket datagram, Discovery decoder, RegistrarImpl reggie, Set<AddressTask> runningTasks)
- {
- this.datagram = datagram;
- this.decoder = decoder;
- this.reggie = reggie;
- this.runningTasks = runningTasks;
- }
-
- /**
- * Decodes this task's multicast request packet, spawning an
- * AddressTask if the packet satisfies the configured constraints,
- * matches this registrar's groups, and does not already contain this
- * registrar's service ID in its list of known registrars. This method
- * assumes that the protocol version of the request has already been
- * checked.
- */
- public void run() {
- MulticastRequest req;
- try {
- req = decoder.decodeMulticastRequest(
- datagram,
- reggie.multicastRequestConstraints.getUnfulfilledConstraints(),
- reggie.multicastRequestSubjectChecker, true);
- } catch (Exception e) {
- if (!(e instanceof InterruptedIOException) &&
- logger.isLoggable(Levels.HANDLED))
- {
- logThrow(
- Levels.HANDLED,
- getClass().getName(),
- "run",
- "exception decoding multicast request from {0}:{1}",
- new Object[]{
- datagram.getAddress(),
- Integer.valueOf(datagram.getPort()) },
- e);
- }
- return;
- }
- String[] groups = req.getGroups();
- if ((groups.length == 0 || overlap(reggie.memberGroups, groups)) &&
- indexOf(req.getServiceIDs(), reggie.myServiceID) < 0)
- {
- try {
- req.checkConstraints();
- } catch (Exception e) {
- if (!(e instanceof InterruptedIOException) &&
- logger.isLoggable(Levels.HANDLED))
- {
- logThrow(
- Levels.HANDLED,
- getClass().getName(),
- "run",
- "exception decoding multicast request from {0}:{1}",
- new Object[]{
- datagram.getAddress(),
- Integer.valueOf(datagram.getPort()) },
- e);
- }
- return;
- }
- AddressTask task =
- new AddressTask(req.getHost(), req.getPort(), reggie);
- if (runningTasks.add(task)) {
- try {
- task.run();
- } finally {
- runningTasks.remove(task);
- }
- }
- }
- }
- }
-
- /** Address for unicast discovery response. */
- private static final class AddressTask implements Runnable, Comparable<AddressTask> {
-
- /** The address */
- private final String host;
- /** The port */
- private final int port;
-
- private final RegistrarImpl reggie;
-
- private final int hash;
-
-
- /** Simple constructor */
- public AddressTask(
- String host, int port, RegistrarImpl reggie)
- {
- this.reggie = reggie;
- this.host = host;
- this.port = port;
- int hash = 3;
- hash = 37 * hash + (this.host != null ? this.host.hashCode() : 0);
- hash = 37 * hash + this.port;
- this.hash = hash;
- }
-
- /** Two tasks are equal if they have the same address and port */
- @Override
- public int hashCode() {
- return hash;
- }
-
- /** Two tasks are equal if they have the same address and port */
- public boolean equals(Object obj) {
- if (!(obj instanceof AddressTask))
- return false;
- AddressTask ua = (AddressTask)obj;
- return host.equals(ua.host) && port == ua.port;
- }
-
- /** Connect and then process a unicast discovery request */
- public void run() {
- InetAddress[] addr = new InetAddress[]{};
- try {
- try {
- addr = InetAddress.getAllByName(host);
- if (addr == null)
- addr = new InetAddress[]{};
- } catch (UnknownHostException e) {
- if (logger.isLoggable(Level.INFO)) {
- logThrow(
- Level.INFO,
- getClass().getName(),
- "run",
- "failed to resolve host {0};"
- + " connection will still be attempted",
- new Object[]{ host },
- e);
- }
- }
- long deadline = DiscoveryConstraints.process(
- reggie.rawUnicastDiscoveryConstraints).getConnectionDeadline(
- Long.MAX_VALUE);
- long now = System.currentTimeMillis();
- if (deadline <= now)
- throw new SocketTimeoutException("timeout expired before"
- + " connection attempt");
- long timeLeft = deadline - now;
- int timeout = timeLeft >= Integer.MAX_VALUE ?
- Integer.MAX_VALUE : (int) timeLeft;
- // attempt connection even if host name was not resolved
- int len = addr.length;
- if (len == 0) {
- attemptResponse(
- new InetSocketAddress(host, port), timeout);
- return;
- }
- for (int i = 0; i < len; i++) {
- try {
- attemptResponse(
- new InetSocketAddress(addr[i], port), timeout);
- return;
- } catch (Exception e) {
- if (logger.isLoggable(Levels.HANDLED)) {
- logThrow(Levels.HANDLED, getClass().getName(),
- "run", "exception responding to {0}:{1}",
- new Object[] {addr[i], Integer.valueOf(port)}
- , e);
- }
- }
- timeLeft = deadline - System.currentTimeMillis();
- timeout = timeLeft >= Integer.MAX_VALUE ?
- Integer.MAX_VALUE : (int) timeLeft;
- if (timeLeft <= 0)
- throw new SocketTimeoutException("timeout expired"
- + " before successful response");
- }
- } catch (Exception e) {
- if (logger.isLoggable(Level.INFO)) {
- logThrow(
- Level.INFO,
- getClass().getName(),
- "run",
- "failed to respond to {0} on port {1}",
- new Object[]{Arrays.asList(addr), Integer.valueOf(port)},
- e);
- }
- }
- }
-
- /** attempt a connection to multicast request client */
- private void attemptResponse(InetSocketAddress addr, int timeout)
- throws Exception
- {
- Socket s = reggie.socketFactory.createSocket();
- try {
- s.connect(addr, timeout);
- reggie.respond(s);
- } finally {
- try {
- s.close();
- } catch (IOException e) {
- logger.log(Levels.HANDLED, "exception closing socket", e);
- }
- }
- }
-
- @Override
- public int compareTo(AddressTask o) {
- int hostCompare = host.compareTo(o.host);
- if ( hostCompare == -1) return -1;
- if ( hostCompare == 1) return 1;
- if (port < o.port) return -1;
- if (port > o.port) return 1;
- return 0;
- }
- }
-
- /** Socket for unicast discovery response. */
- private static final class SocketTask implements Runnable {
-
- /** The socket */
- public final Socket socket;
- public final RegistrarImpl reggie;
-
- /** Simple constructor */
- public SocketTask(Socket socket, RegistrarImpl reggie) {
- this.socket = socket;
- this.reggie = reggie;
- }
-
- /** Process a unicast discovery request */
- public void run() {
- try {
- reggie.respond(socket);
- } catch (Exception e) {
- if (logger.isLoggable(Levels.HANDLED)) {
- logThrow(
- Levels.HANDLED,
- getClass().getName(),
- "run",
- "exception handling unicast discovery from {0}:{1}",
- new Object[] {
- socket.getInetAddress(),
- Integer.valueOf(socket.getPort())}
- ,
- e);
- }
- }
- }
- }
-
- /** Service lease expiration thread code */
- private static class ServiceExpire implements Runnable {
- final RegistrarImpl reggie;
- /** Create a daemon thread */
- public ServiceExpire(RegistrarImpl reggie) {
- this.reggie = reggie;
- }
-
- public void run() {
- try {
- reggie.concurrentObj.writeLock();
- } catch (ConcurrentLockException e) {
- return;
- }
- try {
- while (!Thread.currentThread().isInterrupted()) {
- long now = System.currentTimeMillis();
- while (true) {
- SvcReg reg = reggie.serviceByTime.first();
- reggie.minSvcExpiration = reg.leaseExpiration;
- if (reggie.minSvcExpiration > now)
- break;
- reggie.deleteService(reg, now);
- reggie.addLogRecord(new ServiceLeaseCancelledLogObj(
- reg.item.serviceID, reg.leaseID));
- if (logger.isLoggable(Level.FINE)) {
- logger.log(
- Level.FINE,
- "expired service registration {0}",
- new Object[]{ reg.item.serviceID });
- }
- }
- try {
- reggie.serviceNotifier.await(reggie.minSvcExpiration - now, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt(); //Restore interrupt.
- return;
- }
- }
- } finally {
- reggie.concurrentObj.writeUnlock();
- }
- }
- }
-
- /** Event lease expiration thread code */
- private static class EventExpire implements Runnable {
- private final RegistrarImpl reggie;
- /** Create a daemon thread */
- public EventExpire(RegistrarImpl reggie) {
- this.reggie = reggie;
- }
-
- public void run() {
- try {
- reggie.concurrentObj.writeLock();
- } catch (ConcurrentLockException e) {
- return;
- }
- try {
- while (!Thread.currentThread().isInterrupted()) {
- long now = System.currentTimeMillis();
- reggie.minEventExpiration = Long.MAX_VALUE;
- while (!reggie.eventByTime.isEmpty()) {
- EventReg reg = reggie.eventByTime.firstKey();
- if (reg.getLeaseExpiration() > now) {
- reggie.minEventExpiration = reg.getLeaseExpiration();
- break;
- }
- reggie.deleteEvent(reg);
- if (logger.isLoggable(Level.FINE)) {
- logger.log(
- Level.FINE,
- "expired event registration {0} for {1}",
- new Object[]{ reg.leaseID, reg.listener });
- }
- }
- try {
- reggie.eventNotifier.await(reggie.minEventExpiration - now, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt(); // restore
- return;
- }
- }
- } finally {
- reggie.concurrentObj.writeUnlock();
- }
- }
- }
-
- /**
- * Termination thread code. We do this in a separate thread to
- * avoid deadlock, because ActivationGroup.inactive will block until
- * in-progress RMI calls are finished.
- */
- private static class Destroy implements Runnable {
- private final RegistrarImpl reggie;
- /** Create a non-daemon thread */
- public Destroy(RegistrarImpl reggie) {
- this.reggie = reggie;
- }
-
- public void run() {
- long now = System.currentTimeMillis();
- long endTime = now + reggie.unexportTimeout;
- if (endTime < 0)
- endTime = Long.MAX_VALUE;
- boolean unexported = false;
- /* first try unexporting politely */
- while(!unexported && (now < endTime)) {
- unexported = reggie.serverExporter.unexport(false);
- if (!unexported) {
- try {
- final long sleepTime =
- Math.min(reggie.unexportWait, endTime - now);
- Thread.currentThread().sleep(sleepTime);
- now = System.currentTimeMillis();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt(); // restore
- logger.log(
- Levels.HANDLED, "exception during unexport wait", e);
- }
- }
- }
- /* if still not unexported, forcibly unexport */
- if(!unexported) {
- reggie.serverExporter.unexport(true);
- }
-
- /* all daemons must terminate before deleting persistent store */
- reggie.serviceExpirer.interrupt();
- reggie.eventExpirer.interrupt();
- reggie.unicaster.interrupt();
- reggie.multicaster.interrupt();
- reggie.announcer.interrupt();
- reggie.snapshotter.interrupt();
- reggie.eventNotifierExec.shutdown();
- List<Runnable> cancelledTasks = reggie.discoveryResponseExec.shutdownNow();
- reggie.joiner.terminate();
- reggie.discoer.terminate();
- try {
- reggie.serviceExpirer.join();
- reggie.eventExpirer.join();
- reggie.unicaster.join();
- reggie.multicaster.join();
- reggie.announcer.join();
- reggie.snapshotter.join();
- } catch (InterruptedException e) {
- }
- reggie.closeRequestSockets(cancelledTasks);
- if (reggie.log != null) {
- reggie.log.deletePersistentStore();
- logger.finer("deleted persistence directory");
- }
- if (reggie.activationID != null) {
- try {
- ActivationGroup.inactive(reggie.activationID, reggie.serverExporter);
- } catch (Exception e) {
- logger.log(Level.INFO, "exception going inactive", e);
- }
- }
- if (reggie.lifeCycle != null) {
- reggie.lifeCycle.unregister(reggie);
- }
- if (reggie.loginContext != null) {
- try {
- reggie.loginContext.logout();
- } catch (LoginException e) {
- logger.log(Level.INFO, "logout failed", e);
- }
- }
- logger.info("Reggie shutdown completed");
- }
- }
-
- /** Multicast discovery request thread code. */
- private static class Multicast implements Runnable, Interruptable {
- private final RegistrarImpl reggie;
- /** Multicast group address used by multicast requests */
- private final InetAddress requestAddr;
- /** Multicast socket to receive packets */
- private final MulticastSocket socket;
- /** Interfaces for which configuration failed */
- private final List<NetworkInterface> failedInterfaces;
-
- private final Set<AddressTask> runningTasks;
-
- private volatile boolean interrupted = false;
-
- /**
- * Create a high priority daemon thread. Set up the socket now
- * rather than in run, so that we get any exception up front.
- */
- public Multicast(RegistrarImpl reggie) throws IOException {
- this.runningTasks = new ConcurrentSkipListSet<AddressTask>();
- this.reggie = reggie;
- List<NetworkInterface> failedInterfaces = new ArrayList<NetworkInterface>();
- if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0)
- {
- requestAddr = null;
- socket = null;
- this.failedInterfaces = failedInterfaces;
- return;
- }
- requestAddr = Constants.getRequestAddress();
- socket = new MulticastSocket(Constants.discoveryPort);
- if (reggie.multicastInterfaces != null) {
- Level failureLogLevel = reggie.multicastInterfacesSpecified ?
- Level.WARNING : Levels.HANDLED;
- int l = reggie.multicastInterfaces.length;
- for (int i = 0; i < l; i++) {
- NetworkInterface nic = reggie.multicastInterfaces[i];
- try {
- socket.setNetworkInterface(nic);
- socket.joinGroup(requestAddr);
- } catch (IOException e) {
- failedInterfaces.add(nic);
- if (logger.isLoggable(failureLogLevel)) {
- logThrow(
- failureLogLevel,
- getClass().getName(),
- "<init>",
- "exception enabling {0}",
- new Object[]{ nic },
- e);
- }
- }
- }
- } else {
- try {
- socket.joinGroup(requestAddr);
- } catch (IOException e) {
- failedInterfaces.add(null);
- logger.log(
- Level.WARNING,
- "exception enabling default interface", e);
- }
- }
- this.failedInterfaces = failedInterfaces;
- }
-
- public void run() {
- if (reggie.multicastInterfaces != null && reggie.multicastInterfaces.length == 0)
- {
- return;
- }
- byte[] buf = new byte[
- reggie.multicastRequestConstraints.getMulticastMaxPacketSize(
- DEFAULT_MAX_PACKET_SIZE)];
- DatagramPacket dgram = new DatagramPacket(buf, buf.length);
- long retryTime =
- System.currentTimeMillis() + reggie.multicastInterfaceRetryInterval;
- while (!interrupted) {
- try {
- int timeout = 0;
- if (!failedInterfaces.isEmpty()) {
- timeout =
- (int) (retryTime - System.currentTimeMillis());
- if (timeout <= 0) {
- retryFailedInterfaces();
- if (failedInterfaces.isEmpty()) {
- timeout = 0;
- } else {
- timeout = reggie.multicastInterfaceRetryInterval;
- retryTime =
- System.currentTimeMillis() + timeout;
- }
- }
- }
- socket.setSoTimeout(timeout);
- dgram.setLength(buf.length);
- try {
- socket.receive(dgram);
- } catch (NullPointerException e) {
- break; // workaround for bug 4190513
- }
-
- int pv;
- try {
- pv = ByteBuffer.wrap(dgram.getData(),
- dgram.getOffset(),
- dgram.getLength()).getInt();
- } catch (BufferUnderflowException e) {
- throw new DiscoveryProtocolException(null, e);
- }
- reggie.multicastRequestConstraints.checkProtocolVersion(pv);
[... 9517 lines stripped ...]