You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2013/04/15 17:26:46 UTC
svn commit: r1468119 [11/15] - in /river/jtsk/skunk/qa_refactor/trunk:
qa/src/com/sun/jini/qa/harness/ qa/src/com/sun/jini/test/impl/mahalo/
qa/src/com/sun/jini/test/resources/ qa/src/com/sun/jini/test/share/
qa/src/com/sun/jini/test/spec/javaspace/con...
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java?rev=1468119&r1=1468118&r2=1468119&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/OutriggerServerImpl.java Mon Apr 15 15:26:44 2013
@@ -1,4060 +1,4061 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.sun.jini.outrigger;
-
-import com.sun.jini.config.Config;
-import com.sun.jini.constants.TimeConstants;
-import com.sun.jini.landlord.Landlord;
-import com.sun.jini.landlord.LandlordUtil;
-import com.sun.jini.landlord.LeasedResource;
-import com.sun.jini.landlord.LeasePeriodPolicy;
-import com.sun.jini.landlord.FixedLeasePeriodPolicy;
-import com.sun.jini.landlord.LocalLandlord;
-import com.sun.jini.landlord.LeaseFactory;
-import com.sun.jini.logging.Levels;
-import com.sun.jini.start.LifeCycle;
-import com.sun.jini.start.Starter;
-
-import net.jini.id.Uuid;
-import net.jini.id.UuidFactory;
-import net.jini.activation.ActivationGroup;
-
-import net.jini.config.Configuration;
-import net.jini.config.ConfigurationProvider;
-import net.jini.config.ConfigurationException;
-
-import net.jini.export.Exporter;
-import net.jini.jeri.BasicJeriExporter;
-import net.jini.jeri.BasicILFactory;
-import net.jini.jeri.tcp.TcpServerEndpoint;
-
-import net.jini.core.constraint.RemoteMethodControl;
-import net.jini.security.TrustVerifier;
-import net.jini.security.ProxyPreparer;
-import net.jini.security.proxytrust.ServerProxyTrust;
-
-import net.jini.activation.ActivationExporter;
-
-import net.jini.core.discovery.LookupLocator;
-import net.jini.core.lookup.ServiceID;
-import net.jini.core.entry.Entry;
-import net.jini.core.event.EventRegistration;
-import net.jini.core.event.RemoteEventListener;
-import net.jini.core.lease.Lease;
-import net.jini.core.lease.LeaseDeniedException;
-import net.jini.core.lease.UnknownLeaseException;
-import net.jini.core.transaction.CannotJoinException;
-import net.jini.core.transaction.CannotNestException;
-import net.jini.core.transaction.Transaction;
-import net.jini.core.transaction.TransactionException;
-import net.jini.core.transaction.UnknownTransactionException;
-import net.jini.core.transaction.server.ServerTransaction;
-import net.jini.core.transaction.server.TransactionManager;
-import net.jini.lookup.entry.ServiceInfo;
-import net.jini.space.InternalSpaceException;
-import net.jini.space.JavaSpace;
-
-import java.io.IOException;
-import java.rmi.MarshalledObject;
-import java.rmi.NoSuchObjectException;
-import java.rmi.RemoteException;
-import java.rmi.UnmarshalException;
-import java.rmi.activation.ActivationID;
-import java.rmi.activation.ActivationSystem;
-import java.rmi.activation.ActivationException;
-import java.security.SecureRandom;
-import java.security.PrivilegedExceptionAction;
-import java.security.PrivilegedActionException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-import java.util.WeakHashMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.security.auth.Subject;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-
-/**
- * A basic implementation of a JavaSpaces<sup><font size=-2>TM</font></sup>
- * service. This class is designed for use by both transient and
- * persistent instances. Persistence is delegated to <code>Store</code>
- * and <code>LogOps</code> objects which handles the details
- * of implementing a particular persistence strategy. If transient
- * a <code>null</code> value is used for the <code>LogOps</code> object.
- * <p>
- * <code>OutriggerServerImpl</code> maintains a list of types of
- * entries it has seen and their subtypes using a
- * <code>TypeTree</code> object. Each type of entry has an
- * <code>EntryHolder</code> that is stored in the
- * <code>EntryHolderSet</code> object named <code>contents</code>.
- * <p>
- * On <code>write</code>, the written entry's class and superclass are
- * added to the known <code>types</code>, and its <code>EntryRep</code>
- * is added to the space's <code>contents</code>.
- * <p>
- * On <code>read</code>, the <code>find</code> method searches through
- * the entries of its type and subtypes, asking each entry holder if it
- * has an entry that matches the template. If a match is found, the
- * matching <code>EntryRep</code> is returned. If none of the
- * appropriate holders has a match, it will return <code>null</code>.
- * <p>
- * On <code>take</code> we also use <code>find</code> with a
- * <code>boolean</code> that says to remove the entry that matches.
- * <p>
- * Notification requires a separate <code>Notifier</code> queue and
- * thread. When an entry is written, a reference to it is added to the
- * queue of "unexamined entries". The notifier thread pulls entries
- * off the queue and checks them against registered notification
- * templates. When it has found all matches for the template, the
- * <code>Notifier</code> thread adds the notifications for this write
- * to its list of undelivered notifications, which it periodically
- * attempts to deliver.
- * <p>
- * On <code>notify</code>, the template is added to the
- * <code>TemplateHolderSet</code> named <code>template</code>. This
- * stores <code>TemplateHolder</code> objects for each known type.
- * <p>
- * In this implementation, <code>EntryRep</code> ID's are approximate
- * time stamps.
- *
- * @author Sun Microsystems, Inc.
- */
-public class OutriggerServerImpl
- implements OutriggerServer, TimeConstants, LocalLandlord, Recover,
- ServerProxyTrust, Starter
-{
- /**
- * Component name we use to find items in the configuration and loggers.
- */
- public final static String COMPONENT_NAME = "com.sun.jini.outrigger";
-
- /**
- * Logger name for information related to starting/restarting/destroying
- * the service
- */
- static final String lifecycleLoggerName = COMPONENT_NAME + ".lifecycle";
-
- /** Logger name for information related top level operations */
- static final String opsLoggerName = COMPONENT_NAME + ".operations";
-
- /** Logger name for information related to transactions */
- static final String txnLoggerName = COMPONENT_NAME + ".transactions";
-
- /** Logger name for information related to leases and leasing */
- static final String leaseLoggerName = COMPONENT_NAME + ".leases";
-
- /** Logger name for information related to iterators */
- static final String iteratorLoggerName = COMPONENT_NAME + ".iterator";
-
- /** Logger name for information related to join state */
- static final String joinLoggerName = COMPONENT_NAME + ".join";
-
- /** Logger name for information related to entry matching */
- static final String matchingLoggerName = COMPONENT_NAME + ".entryMatching";
-
- /** Logger name for information related to events */
- static final String eventLoggerName = COMPONENT_NAME + ".event";
-
- /** Logger name for information related to persistence */
- static public final String storeLoggerName = COMPONENT_NAME + ".store";
-
- /**
- * Logger for information related to starting/restarting/destroying
- * the service
- */
- static private final Logger lifecycleLogger =
- Logger.getLogger(lifecycleLoggerName);
-
- /** Logger for information related to top level operations */
- static private final Logger opsLogger = Logger.getLogger(opsLoggerName);
-
- /** Logger for information related to transactions */
- static private final Logger txnLogger = Logger.getLogger(txnLoggerName);
-
- /** Logger for information related to leases and leasing */
- static private final Logger leaseLogger =
- Logger.getLogger(leaseLoggerName);
-
- /** Logger for logging information to iterators */
- private static final Logger iteratorLogger =
- Logger.getLogger(iteratorLoggerName);
-
- /** Logger for logging information about join state */
- private static final Logger joinLogger = Logger.getLogger(joinLoggerName);
-
- /**
- * The name of the configuration entry we use to get the
- * the name of the log directory from.
- */
- public static final String PERSISTENCE_DIR_CONFIG_ENTRY =
- "persistenceDirectory";
-
- /**
- * List of <code>com.sun.jini.outrigger.EntryHolder</code>s for
- * each specific type.
- */
- private final EntryHolderSet contents;
-
- /**
- * A list of known subclasses of each class of entry.
- */
- private final TypeTree types = new TypeTree();
-
- /**
- * A list of hashes to check against class types.
- */
- private final HashMap<String,Long> typeHashes = new HashMap<String,Long>();
-
- /**
- * Templates for which someone has registered interest
- */
- private final TransitionWatchers templates;
-
- /**
- * A map from event registration cookies to
- * <code>EventRegistrationRecord</code> instances
- */
- final private Map<Uuid,LeasedResource> eventRegistrations =
- Collections.synchronizedMap(new java.util.HashMap<Uuid,LeasedResource>());
-
- /**
- * A map from contents result cookies to <code>ContentsQuery</code> objects.
- */
- final private Map<Uuid,LeasedResource> contentsQueries =
- Collections.synchronizedMap(new java.util.HashMap<Uuid,LeasedResource>());
-
- /**
- * The transactions recovered after restart. This table
- * is cleared after recovery
- */
- private final Map<Long,Txn> recoveredTxns = new ConcurrentHashMap<Long,Txn>();
-
- /**
- * The transactions in which this space is a participant.
- * Includes broken <code>Txn</code>s.
- */
- private final TxnTable txnTable;
-
- /**
- * The crash count. This must be different each boot that forgets
- * (implicitly aborts) pending transactions on shutdown.
- */
- private final long crashCount = System.currentTimeMillis();
-
- /**
- * The reaper thread for expired notifications
- *
- * @serial
- */
- private final TemplateReaper templateReaperThread;
-
- /**
- * The reaper thread for removed entries.
- * Otherwise the entries are not garbage collected after removal
- * since their "prev" links are never traversed which has the side
- * effect of removing the associated object references.
- *
- * @serial
- */
- private final EntryReaper entryReaperThread;
-
- /**
- * The reaper thread for expired contents queries
- *
- * @serial
- */
- private final ContentsQueryReaper contentsQueryReaperThread;
-
- /**
- * Object that recored operations on entries and makes sure they
- * get seen by the watchers in <code>templates</code>.
- */
- private final OperationJournal operationJournal;
-
- /**
- * The notification object
- *
- * @serial
- */
- private volatile Notifier notifier;
-
- /**
- * Object that queues up lease expirations for future logging.
- * Only allocated if we have a store, otherwise left
- * <code>null</code>
- */
- private final ExpirationOpQueue expirationOpQueue;
-
- /**
- * The monitor for ongoing transactions.
- */
- private final TxnMonitor txnMonitor;
-
- /**
- * The wrapper that intercepts incoming remote calls for locking
- * purposes and then delegates to us. This is the object that gets
- * exported.
- */
- private final OutriggerServerWrapper serverGate;
-
- /** Object to notify if we destroy ourselves, may be <code>null</code> */
- private final LifeCycle lifeCycle;
-
- /**
- * Object we used to export ourselves.
- */
- private final Exporter exporter;
-
- /**
- * The remote ref (e.g. stub or dynamic proxy) for this
- * server.
- * @serial
- */
- private final OutriggerServer ourRemoteRef;
-
- /**
- * The <code>Uuid</code> for this service. Used in the
- * <code>SpaceProxy2</code> and <code>AdminProxy</code> to
- * implement reference equality. We also derive our
- * <code>ServiceID</code> from it.
- */
- private volatile Uuid topUuid = null;
-
- /**
- * The proxy for this space.
- *
- * @serial
- */
- private volatile SpaceProxy2 spaceProxy;
-
- /**
- * The admin proxy for this space.
- *
- * @serial
- */
- private volatile AdminProxy adminProxy;
-
- /**
- * The participant proxy for this space
- *
- * @serial
- */
- private volatile ParticipantProxy participantProxy;
-
- /**
- * Holds the basis/lower bound for all sequence numbers issued.
- * Sequence numbers are included in the various notifications given
- * by Outrigger. If Outrigger crashes and restarts, the sequence
- * numbers issued by the new invocation must not overlap with those
- * already issued. To ensure this, begin issuing sequence numbers
- * in the new Outrigger process with values far in excess of
- * anything previously issued.
- * <p>
- * For any given invocation of Outrigger, the numbers are "fully
- * ordered." However, when Outrigger restarts after a crash, the
- * numbers (when compared to the previous invocation) will appear
- * to have a [large] gap.
- * <p>
- * [See the JavaSpaces Service Specification for detail on "fully
- * ordered".]
- */
- private final AtomicLong sessionId = new AtomicLong();
-
- /**
- * Policy used to create and renew leases on entries
- */
- private final LeasePeriodPolicy entryLeasePolicy;
-
- /**
- * Policy used to create and renew leases on event registrations
- */
- private final LeasePeriodPolicy eventLeasePolicy;
-
- /**
- * Policy used to create and renew leases on event contents queries
- */
- private final LeasePeriodPolicy contentsLeasePolicy;
-
- /**
- * Factory we use to create leases
- */
- private volatile LeaseFactory leaseFactory;
-
- /**
- * @serial
- */
- private final JoinStateManager joinStateManager = new JoinStateManager();
-
- /**
- * Our IDs 64 bits secure random numbers, this is
- * <code>SecureRandom</code> instance we use to create them.
- */
- private static final SecureRandom idGen = new SecureRandom();
-
- /** Our activation ID, <code>null</code> if we are not activatable */
- private final ActivationID activationID;
-
- /**
- * A prepared reference to the activation system, <code>null</code> if
- * we are not activatable.
- */
- private final ActivationSystem activationSystem;
-
-
- /**
- * Store - The reference to the persistent store, if any.
- */
- private volatile Store store;
-
- /**
- * Log object to record state and operation information. The
- * store provides this object. Will be <code>null</code> if
- * this is a transient server instance.
- */
- private volatile LogOps log = null;
-
- /** The map of <code>Uuid</code> to active iterations */
- private final Map iterations =
- Collections.synchronizedMap(new java.util.HashMap());
-
- /** The login context, for logging out */
- private final LoginContext loginContext;
-
- /** <code>ProxyPreparer</code> for transaction managers */
- private final ProxyPreparer transactionManagerPreparer;
-
- /** <code>ProxyPreparer</code> for event listeners */
- private final ProxyPreparer listenerPreparer;
-
- /**
- * <code>ProxyPreparer</code> for transaction managers
- * that get recovered from the store. <code>null</code> if
- * this is not a persistent space.
- */
- private final ProxyPreparer recoveredTransactionManagerPreparer;
-
- /**
- * <code>ProxyPreparer</code> for event listeners
- * that get recovered from the store. <code>null</code> if
- * this is not a persistent space.
- */
- private final ProxyPreparer recoveredListenerPreparer;
-
- /** Max number of entries to return in a next call */
- private final int nextLimit;
-
- /** Max number of entries to return in a take multiple call */
- private final int takeLimit;
-
- /**
- * When destroying the space, how long to wait for a clean
- * unexport (which allows the destroy call to return) before
- * giving up calling <code>unexport(true)</code>
- */
- private final long maxUnexportDelay;
-
- /**
- * Length of time to sleep between unexport attempts
- */
- private final long unexportRetryDelay;
-
- // Synchronized access with thisl
- private boolean started = false;
- /** Temporary start state Object variables set to null after start */
- private Configuration config;
- private Thread starter;
- private Throwable thrown;
- private Exception except;
- private boolean persistent;
- private long maxServerQueryTimeout;
-
- /**
- * Create a new <code>OutriggerServerImpl</code> server (possibly a
- * new incarnation of an activatable one).
- * Exports the server as well.
- *
- * @param activationID of the server, may be null.
- * @param lifeCycle the object to notify when this
- * service is destroyed, may be null.
- * @param configArgs set of strings to be used to obtain a
- * <code>Configuration</code>.
- * @param persistent If <code>true</code> will throw an
- * <code>ConfigurationException</code>
- * if there is no persistence directory or
- * store specified in the configuration.
- * @param wrapper the wrapper that intercepts
- * incoming remote calls before delegating
- * them to <code>this</code>.
- * @throws IOException if there is problem recovering data
- * from disk, exporting the server, or unpacking
- * <code>data</code>.
- * @throws ConfigurationException if the <code>Configuration</code> is
- * malformed.
- * @throws ActivationException if activatable and there
- * is a problem getting a reference to the activation system.
- * @throws LoginException if the <code>loginContext</code> specified
- * in the configuration is non-null and throws
- * an exception when login is attempted.
- */
- OutriggerServerImpl(final ActivationID activationID, LifeCycle lifeCycle,
- String[] configArgs, final boolean persistent,
- final OutriggerServerWrapper wrapper)
- throws IOException, ConfigurationException, LoginException,
- ActivationException
- {
- this.lifeCycle = lifeCycle;
- this.serverGate = wrapper;
- this.persistent = persistent;
- InitHolder h = null;
- LoginContext loginContext = null;
- try {
- final Configuration config =
- ConfigurationProvider.getInstance(configArgs,
- getClass().getClassLoader());
- this.config = config;
- loginContext = (LoginContext) config.getEntry(
- COMPONENT_NAME, "loginContext", LoginContext.class, null);
- if (loginContext == null) {
- h = init(config, persistent, activationID, wrapper);
- } else {
- loginContext.login();
- try {
- h = Subject.doAsPrivileged(
- loginContext.getSubject(),
- new PrivilegedExceptionAction<InitHolder>() {
- public InitHolder run() throws Exception {
- return init(config, persistent, activationID, wrapper);
- }
- },
- null);
- } catch (PrivilegedActionException e) {
- throw e.getCause();
- }
- }
- } catch (IOException e) {
- unwindConstructor(e);
- except = e;
- } catch (ConfigurationException e) {
- unwindConstructor(e);
- except = e;
- } catch (LoginException e) {
- unwindConstructor(e);
- except = e;
- } catch (RuntimeException e) {
- unwindConstructor(e);
- except = e;
- } catch (Throwable e) {
- unwindConstructor(e);
- thrown = e;
- }
- if (thrown == null && except == null) {
- lifecycleLogger.log(Level.INFO, "Outrigger server started: {0}", this);
- this.loginContext = loginContext;
- this.activationID = h.activationID;
- txnMonitor = h.txnMonitor;
- activationSystem = h.activationSystem;
- transactionManagerPreparer = h.transactionManagerPreparer;
- listenerPreparer = h.listenerPreparer;
- exporter = h.exporter;
- ourRemoteRef = h.ourRemoteRef;
- contents = h.contents;
- templates = h.templates;
- operationJournal = h.operationJournal;
- expirationOpQueue = h.expirationOpQueue;
- recoveredTransactionManagerPreparer = h.recoveredTransactionManagerPreparer;
- recoveredListenerPreparer = h.recoveredListenerPreparer;
- txnTable = h.txnTable;
- leaseFactory = h.leaseFactory;
- entryLeasePolicy = h.entryLeasePolicy;
- eventLeasePolicy = h.eventLeasePolicy;
- contentsLeasePolicy = h.contentsLeasePolicy;
- nextLimit = h.nextLimit;
- takeLimit = h.takeLimit;
- maxUnexportDelay = h.maxUnexportDelay;
- unexportRetryDelay = h.unexportRetryDelay;
- notifier = h.notifier;
- templateReaperThread = h.templateReaperThread;
- entryReaperThread = h.entryReaperThread;
- contentsQueryReaperThread = h.contentsQueryReaperThread;
- starter = h.starter;
- maxServerQueryTimeout = h.maxServerQueryTimeout;
- } else {
- lifecycleLogger.log(Level.SEVERE, "Failed to construct Outrigger server", except == null ? thrown : except);
- this.loginContext = null;
- this.activationID = null;
- txnMonitor = null;
- activationSystem = null;
- transactionManagerPreparer = null;
- listenerPreparer = null;
- exporter = null;
- ourRemoteRef = null;
- contents = null;
- templates = null;
- operationJournal = null;
- expirationOpQueue = null;
- recoveredTransactionManagerPreparer = null;
- recoveredListenerPreparer = null;
- txnTable = null;
- leaseFactory = null;
- entryLeasePolicy = null;
- eventLeasePolicy = null;
- contentsLeasePolicy = null;
- nextLimit = 0;
- takeLimit = 0;
- maxUnexportDelay = 0;
- unexportRetryDelay = 0;
- notifier = null;
- templateReaperThread = null;
- entryReaperThread = null;
- contentsQueryReaperThread = null;
- starter = null;
- maxServerQueryTimeout = 0;
- }
- }
-
- public void start() throws Exception {
- // Threads are created with login context if it exists, but started in
- // privileged context.
- synchronized (this){
- if (started) return;
- started = true;
- }
- if (thrown != null) throw (Error) thrown;
- if (except != null) throw except;
- try {
- // This takes a while the first time, so let's get it going
- starter.start();
-
- // Get store from configuration
- if (persistent) {
- store = (Store)Config.getNonNullEntry(config,
- COMPONENT_NAME,
- "store", Store.class);
- expirationOpQueue.start();
- }
- // If we have a store, recover the log
- if (store != null) {
- log = store.setupStore(this);
-
- // Record this boot
- //
- log.bootOp(System.currentTimeMillis(), getSessionId());
- recoverTxns();
- } else if (activationID != null || persistent) {
- /* else we don't have a store, if we need one complain
- * will be logged by constructor
- */
- throw new ConfigurationException("Must provide for a " +
- "store for component " + COMPONENT_NAME + ", by providing " +
- "valid values for the store or " +
- PERSISTENCE_DIR_CONFIG_ENTRY + " entries if creating " +
- " a persistent space");
- }
-
- /* Now that we have recovered any store we have, create a
- * Uuid if there was not one in the store.
- */
- if (topUuid == null) {
- topUuid = UuidFactory.generate();
- if (log != null)
- log.uuidOp(topUuid);
- }
-
- if (ourRemoteRef instanceof RemoteMethodControl) {
- spaceProxy = new ConstrainableSpaceProxy2(ourRemoteRef, topUuid,
- maxServerQueryTimeout, null);
- adminProxy =
- new ConstrainableAdminProxy(ourRemoteRef, topUuid, null);
- participantProxy =
- new ConstrainableParticipantProxy(ourRemoteRef, topUuid, null);
- } else {
- spaceProxy = new SpaceProxy2(ourRemoteRef, topUuid,
- maxServerQueryTimeout);
- adminProxy = new AdminProxy(ourRemoteRef, topUuid);
- participantProxy = new ParticipantProxy(ourRemoteRef, topUuid);
- }
-
- leaseFactory = new LeaseFactory(ourRemoteRef, topUuid);
-
- /* Kick off independent threads. */
-
- // start the JoinStateManager
- joinStateManager.startManager(config, log, spaceProxy,
- new ServiceID(topUuid.getMostSignificantBits(),
- topUuid.getLeastSignificantBits()),
- attributesFor());
- // Notifier uses TaskManager, which doesn't start threads until given tasks.
- notifier = new Notifier(spaceProxy, recoveredListenerPreparer, config);
- operationJournal.start();
- templateReaperThread.start();
- entryReaperThread.start();
- contentsQueryReaperThread.start();
- } catch (Exception e) {
- // Clean up and rethrow.
- lifecycleLogger.log(Level.SEVERE, "Failed to start Outrigger server", e);
- // If we created a JoinStateManager,
-
- try {
- joinStateManager.destroy();
- } catch (Throwable t) {
- // Ignore and go on
- }
-
- if (expirationOpQueue != null)
- expirationOpQueue.terminate();
-
- if (txnMonitor != null) {
- try {
- txnMonitor.destroy();
- } catch (Throwable t) {
- // Ignore and go on
- }
- }
-
- // Interrupt and join independent threads
- if (notifier != null) {
- try {
- notifier.terminate();
- } catch (Throwable t) {
- // Ignore and go on
- }
- }
-
- if (operationJournal != null) {
- try {
- operationJournal.terminate();
- } catch (Throwable t) {
- // Ignore and go on
- }
- }
-
- unwindReaper(templateReaperThread);
- unwindReaper(entryReaperThread);
- unwindReaper(contentsQueryReaperThread);
-
- // Close (but do not destroy) the store
- if (store != null) {
- try {
- store.close();
- } catch (Throwable t) {
- // Ignore and go on
- }
- }
- throw e;
- } finally {
- config = null;
- starter = null;
- except = null;
- thrown = null;
- }
- }
-
- private static class InitHolder {
- ActivationID activationID;
- TxnMonitor txnMonitor;
- ActivationSystem activationSystem;
- ProxyPreparer transactionManagerPreparer;
- ProxyPreparer listenerPreparer;
- Exporter exporter;
- OutriggerServer ourRemoteRef;
- EntryHolderSet contents;
- TransitionWatchers templates;
- OperationJournal operationJournal;
- ExpirationOpQueue expirationOpQueue;
- ProxyPreparer recoveredTransactionManagerPreparer;
- ProxyPreparer recoveredListenerPreparer;
- TxnTable txnTable;
- LeaseFactory leaseFactory;
- LeasePeriodPolicy entryLeasePolicy;
- LeasePeriodPolicy eventLeasePolicy;
- LeasePeriodPolicy contentsLeasePolicy;
- int nextLimit;
- int takeLimit;
- long maxUnexportDelay;
- long unexportRetryDelay;
- Notifier notifier;
- TemplateReaper templateReaperThread;
- EntryReaper entryReaperThread;
- ContentsQueryReaper contentsQueryReaperThread;
- Thread starter;
- long maxServerQueryTimeout;
- }
-
- /**
- * The bulk of the work for creating an
- * <code>OutriggerServerImpl</code> server. Anything that needs
- * to be done with a subject is done by this method. Assumes the
- * <code>serverGate</code> and <code>activationID</code> fields have
- * been set.
- *
- * @param config The configuration being used to configure
- * this server.
- * @param persistent If <code>true</code> will throw an
- * <code>ConfigurationException</code>
- * if there is no persistence directory or
- * store specified in the configuration.
- *
- * @throws IOException if there is problem recovering data
- * from disk, exporting the server, or unpacking
- * <code>data</code>.
- * @throws ConfigurationException if the <code>Configuration</code> is
- * malformed. */
- private InitHolder init(Configuration config,
- boolean persistent,
- ActivationID activationID,
- OutriggerServerWrapper serverGate)
- throws IOException, ConfigurationException, ActivationException
- {
- InitHolder h = new InitHolder();
- try {
- h.txnMonitor = new TxnMonitor(this, config);
- /* Get the activation related preparers we need */
-
- // Default do nothing preparer
- final ProxyPreparer defaultPreparer =
- new net.jini.security.BasicProxyPreparer();
-
- if (activationID != null) {
- final ProxyPreparer aidPreparer =
- (ProxyPreparer)Config.getNonNullEntry(config,
- COMPONENT_NAME, "activationIdPreparer",
- ProxyPreparer.class, defaultPreparer);
-
- final ProxyPreparer aSysPreparer =
- (ProxyPreparer)Config.getNonNullEntry(config,
- COMPONENT_NAME, "activationSystemPreparer",
- ProxyPreparer.class, defaultPreparer);
-
- h.activationID =
- (ActivationID)aidPreparer.prepareProxy(h.activationID);
- h.activationSystem =
- (ActivationSystem)aSysPreparer.prepareProxy(
- ActivationGroup.getSystem());
- }
-
-
- // The preparers that all outrigger's need
- h.transactionManagerPreparer =
- (ProxyPreparer)Config.getNonNullEntry(config,
- COMPONENT_NAME, "transactionManagerPreparer",
- ProxyPreparer.class, defaultPreparer);
-
- h.listenerPreparer =
- (ProxyPreparer)Config.getNonNullEntry(config,
- COMPONENT_NAME, "listenerPreparer",
- ProxyPreparer.class, defaultPreparer);
-
- /* Export the server. */
-
- // Get the exporter
-
- /* What we use for the default (or in the default activatable case
- * what we make the underlying exporter).
- */
- final Exporter basicExporter =
- new BasicJeriExporter(TcpServerEndpoint.getInstance(0),
- new BasicILFactory(), false, true);
- if (activationID == null) {
- h.exporter = (Exporter)Config.getNonNullEntry(config,
- COMPONENT_NAME, "serverExporter", Exporter.class,
- basicExporter);
- } else {
- h.exporter = (Exporter)Config.getNonNullEntry(config,
- COMPONENT_NAME, "serverExporter", Exporter.class,
- new ActivationExporter(activationID, basicExporter),
- activationID);
- }
-
- h.ourRemoteRef = (OutriggerServer)h.exporter.export(serverGate);
-
- // Create our top level proxy
- h.maxServerQueryTimeout =
- Config.getLongEntry(config, COMPONENT_NAME,
- "maxServerQueryTimeout", Long.MAX_VALUE, 1, Long.MAX_VALUE);
-
- /* Initialize various fields that will be filled in during
- * log recovery.
- */
- h.contents = new EntryHolderSet(this);
- h.templates = new TransitionWatchers(this);
-
- // This takes a while the first time, so let's get it going
- h.starter = new Thread() {
- public void run() {
- OutriggerServerImpl.nextID();
- }
- };
-
-
- // Use this (trivially) in log recovery
- h.operationJournal = new OperationJournal(h.templates);
- h.operationJournal.setDaemon(true);
-
-
- if (persistent){
- /* If we have a store we need a ExpirationOpQueue and the
- * preparers for recovered proxies.
- */
- h.expirationOpQueue = new ExpirationOpQueue(this);
- h.expirationOpQueue.setDaemon(true);
- h.recoveredTransactionManagerPreparer =
- (ProxyPreparer)Config.getNonNullEntry(config,
- COMPONENT_NAME, "recoveredTransactionManagerPreparer",
- ProxyPreparer.class, defaultPreparer);
-
- h.recoveredListenerPreparer =
- (ProxyPreparer)Config.getNonNullEntry(config,
- COMPONENT_NAME, "recoveredListenerPreparer",
- ProxyPreparer.class, defaultPreparer);
- }
- /* Now that we have a recoveredTransactionManagerPreparer
- * (if we need one) create the TxnTable
- */
- h.txnTable = new TxnTable(h.recoveredTransactionManagerPreparer);
-
- /* Initialize any non-persistent state we have */
-
- // Create lease policy objects
- h.entryLeasePolicy = (LeasePeriodPolicy)Config.getNonNullEntry(
- config, COMPONENT_NAME, "entryLeasePeriodPolicy",
- LeasePeriodPolicy.class,
- new FixedLeasePeriodPolicy(Long.MAX_VALUE, 1 * DAYS));
-
- h.eventLeasePolicy = (LeasePeriodPolicy)Config.getNonNullEntry(
- config, COMPONENT_NAME, "eventLeasePeriodPolicy",
- LeasePeriodPolicy.class,
- new FixedLeasePeriodPolicy(1 * HOURS, 1 * HOURS));
-
- h.contentsLeasePolicy = (LeasePeriodPolicy)Config.getNonNullEntry(
- config, COMPONENT_NAME, "contentsLeasePeriodPolicy",
- LeasePeriodPolicy.class,
- new FixedLeasePeriodPolicy(1 * HOURS, 1 * HOURS));
-
- h.nextLimit = Config.getIntEntry(config, COMPONENT_NAME,
- "iteratorBatchSize", 100, 1, Integer.MAX_VALUE);
-
- h.takeLimit = Config.getIntEntry(config, COMPONENT_NAME,
- "takeMultipleLimit", 100, 1, Integer.MAX_VALUE);
-
- h.maxUnexportDelay = Config.getLongEntry(config, COMPONENT_NAME,
- "maxUnexportDelay", 2 * MINUTES, 0, Long.MAX_VALUE);
-
- h.unexportRetryDelay = Config.getLongEntry(config, COMPONENT_NAME,
- "unexportRetryDelay", SECONDS, 1, Long.MAX_VALUE);
-
-
- // Create threads but don't start.
- final long reapingInterval =
- Config.getLongEntry(config, COMPONENT_NAME, "reapingInterval",
- 10000, 1, Long.MAX_VALUE);
-
- final int reapingPriority =
- Config.getIntEntry(config, COMPONENT_NAME,"reapingPriority",
- Thread.NORM_PRIORITY, Thread.MIN_PRIORITY,
- Thread.MAX_PRIORITY);
-
- h.templateReaperThread = new TemplateReaper(reapingInterval);
- h.templateReaperThread.setPriority(reapingPriority);
- h.templateReaperThread.setDaemon(true);
- h.entryReaperThread = new EntryReaper(reapingInterval);
- h.entryReaperThread.setPriority(reapingPriority);
- h.entryReaperThread.setDaemon(true);
- h.contentsQueryReaperThread = new ContentsQueryReaper(reapingInterval);
- h.contentsQueryReaperThread.setPriority(reapingPriority);
- h.contentsQueryReaperThread.setDaemon(true);
- return h;
- } catch (IOException e) {
- unwindExporter(h.ourRemoteRef, h.exporter);
- throw e;
- } catch (ConfigurationException e){
- unwindExporter(h.ourRemoteRef, h.exporter);
- throw e;
- } catch (ActivationException e){
- unwindExporter(h.ourRemoteRef, h.exporter);
- throw e;
- }
- }
-
- /**
- * Undo any work a failed constructor might have done without
- * destroying the service. Has to work even if the
- * constructor failed part way through.
- * @param cause The exception that caused the
- * constructor to fail.
- */
- private void unwindConstructor(Throwable cause) {
- serverGate.rejectCalls(
- new RemoteException("Constructor failure", cause));
-
- lifecycleLogger.log(Level.SEVERE,
- "exception encountered while (re)starting server", cause);
- }
-
- private void unwindExporter(OutriggerServer os, Exporter e){
- // If we exported, unexport, use force=true since no calls we
- // care about should be in progress.
- if (os != null) {
- try {
- e.unexport(true);
- } catch (Throwable t) {
- // Ignore and go on
- }
- }
- }
-
- /** Kill the given reaper as quickly and quietly */
- private void unwindReaper(Reaper r) {
- if (r == null)
- return;
-
- try {
- r.kill();
- r.join();
- } catch (Throwable t) {
- // Ignore and go on
- }
- }
-
- /**
- * Process the recovered transactions. This includes forcing the
- * state to PREPARED, placing the txn on the tnxs list (or brokenTxns
- * list if the codebase is broken) and finally monitoring the txns.
- */
- private void recoverTxns() {
-
- if (recoveredTxns.isEmpty())
- return;
-
- // Only PREPARED transactions are recovered. Since the txn
- // state is ACTIVE when stored, we must now force the state
- // to PREPARED
- //
- final Collection values = recoveredTxns.values();
- final Iterator t = values.iterator();
- while (t.hasNext()) {
- // Note, Txns get recovered in the PREPARED state
- txnTable.recover((Txn)t.next());
- }
-
- // Monitor all of the pending transactions
- //
- monitor(values);
- recoveredTxns.clear(); // done with this list
- }
-
- long getSessionId() {
- return sessionId.get();
- }
-
- /**
- * Log a "cancel" operation. Called from
- * <code>EntryHolder</code> and watchers.
- */
- void cancelOp(Uuid cookie, boolean expired) {
- if (log != null) log.cancelOp(cookie, expired);
- }
-
- /**
- * Schedule a cancelOp.
- * @param cookie The cookie to pass to <code>cancelOp</code> when
- * it is called.
- */
- void scheduleCancelOp(Uuid cookie) {
- if (expirationOpQueue != null)
- expirationOpQueue.enqueue(cookie);
- }
-
- /**
- * Check if an entry (or template) class definition
- * has changed. If so, throw an exception. If it is a new
- * class, then we remember it.
- */
- private void typeCheck(EntryRep rep) throws UnmarshalException {
- if (rep == null)
- return;
-
- synchronized (typeHashes) {
- // Check the leaf class. If this (or any superclass)
- // check returns a match (checkClass() == true) we can
- // exit since each class hash covers its superclasses.
- //
- if (checkClass(rep.classFor(), rep.getHash()))
- return;
-
- // Check superclasses
- String[] superclasses = rep.superclasses();
- long[] hashes = rep.getHashes();
- for (int i = 0; i < superclasses.length; i++) {
- if (checkClass(superclasses[i], hashes[i]))
- return;
- }
- }
- }
-
- /**
- * Compare the given hash with the hash stored for the given
- * class name, if any. If there is a hash for the class name and
- * it does not match, throw an exception. If it does match return
- * true. Otherwise, record the new hash and class name and return
- * false.
- */
- private boolean checkClass(String className, long value)
- throws UnmarshalException
- {
- Long hash = (Long)typeHashes.get(className);
-
- if (hash == null) {
- typeHashes.put(className, Long.valueOf(value));
- return false; // new class
- } else {
- if (hash.longValue() != value) {
- final String msg = "Class mismatch: " + className;
- final UnmarshalException ue = new UnmarshalException(msg);
- opsLogger.log(Levels.FAILED, msg, ue);
- throw ue;
- }
- }
- return true; // match
- }
-
- /**
- * Utility method to calculate the lease duration/expiration for
- * a new resource and set the resource's expiration. Handles
- * various error conditions.
- */
- private LeasePeriodPolicy.Result grant(LeasedResource resource,
- long requestedDuration, LeasePeriodPolicy policy, String policyName)
- {
- final LeasePeriodPolicy.Result r;
- try {
- r = policy.grant(resource, requestedDuration);
- resource.setExpiration(r.expiration);
- } catch (LeaseDeniedException e) {
- // This should never happen, we should not be using a
- // policy or a factory that could thrown LeaseDenied on lease
- // creation, re-throw as InternalSpaceException
- throw logAndThrow(new
- InternalSpaceException("OutriggerServerImpl:" + policyName +
- ".grant threw LeaseDeniedException", e),
- opsLogger);
- }
-
- return r;
- }
-
- /**
- * Utility method to check for zero length arrays of entries
- * and throw an exception if necessary
- */
- private void checkForEmpty(EntryRep[] entries, String msg) {
- if (entries.length == 0)
- throw logAndThrowIllegalArg(msg);
- }
-
- /**
- * Utility method to check for null argument values and throw
- * an exception if necessary
- */
- private void checkForNull(Object value, String msg) {
- if (value == null)
- throw logAndThrow(new NullPointerException(msg), opsLogger);
- }
-
- /** Utility method to check for negative timeouts */
- private void checkTimeout(long timeout) {
- if (timeout < 0) {
- throw logAndThrowIllegalArg(
- "timeout = " + timeout + "must be a non-negative value");
- }
- }
-
- /** Utility method to check for non-postive limits */
- private void checkLimit(long limit) {
- if (limit < 1) {
- throw logAndThrowIllegalArg(
- "limit = " + limit + "must be a positive value");
- }
- }
-
- // purposefully inherit doc comment from supertype
- public long[] write(EntryRep rep, Transaction tr, long lease)
- throws TransactionException, RemoteException
- {
- opsLogger.entering("OutriggerServerImpl", "write");
-
- typeCheck(rep);
- rep.pickID();
-
- if (opsLogger.isLoggable(Level.FINER) && (tr != null)) {
- ServerTransaction str = serverTransaction(tr);
- opsLogger.log(Level.FINER, "OutriggerServerImpl: write under " +
- "transaction [mgr:{0} id:{1}]",
- new Object[]{str.mgr, Long.valueOf(str.id)});
- }
-
- Txn txn = enterTxn(tr);
-
- // Set expiration before adding to tables
- final LeasePeriodPolicy.Result r =
- grant(rep, lease, entryLeasePolicy, "entryLeasePolicy");
-
- final EntryHolder holder = contents.holderFor(rep);
- final EntryHandle handle = new EntryHandle(rep, txn, holder);
-
- // Verify that the transaction is still active. Lock it so
- // nobody can change it behind our backs while where making
- // this important determination. If there is no txn, just do it.
- try {
- if (txn != null)
- txn.ensureActive();
-
- // At this point nothing can go wrong, log this addition.
- if (log != null)
- log.writeOp(rep, (txn == null) ? null : txn.getId());
-
- synchronized (handle) {
- addWrittenRep(handle, holder, txn);
- recordTransition(new EntryTransition(handle, txn, true, true,
- true));
- }
-
- } finally {
- if (txn != null)
- txn.allowStateChange();
- }
-
- if (opsLogger.isLoggable(Level.FINEST)) {
- opsLogger.log(Level.FINEST, "writing {0} (txn = {1})",
- new Object[]{rep, txn});
- }
-
- return new long[] {r.duration,
- rep.id().getMostSignificantBits(),
- rep.id().getLeastSignificantBits()};
- }
-
- public long[] write(EntryRep[] entries, Transaction tr, long[] leaseTimes)
- throws TransactionException, RemoteException
- {
- opsLogger.entering("OutriggerServerImpl", "write<multiple>");
- checkForEmpty(entries, "Must write at least one entry");
-
- if (entries.length != leaseTimes.length) {
- throw logAndThrowIllegalArg(
- "Collection of entries and lease times must be same length");
- }
-
- for (int i=0;i<entries.length;i++) {
- checkForNull(entries[i], "Can't write null entry");
-
- if (leaseTimes[i] < 0 && leaseTimes[i] != Lease.ANY) {
- throw logAndThrowIllegalArg("Bad requested lease length:" +
- leaseTimes[i]);
- }
- }
-
- for (int i=0; i<entries.length; i++) {
- typeCheck(entries[i]);
- entries[i].pickID();
- }
-
- if (opsLogger.isLoggable(Level.FINER) && (tr != null)) {
- ServerTransaction str = serverTransaction(tr);
- opsLogger.log(Level.FINER, "OutriggerServerImpl: write<multiple> " +
- "under transaction [mgr:{0} id:{1}]",
- new Object[]{str.mgr, Long.valueOf(str.id)});
- }
-
- Txn txn = enterTxn(tr);
-
- final LeasePeriodPolicy.Result[] leaseData =
- new LeasePeriodPolicy.Result[entries.length];
- final EntryHolder[] holders = new EntryHolder[entries.length];
- final EntryHandle[] handles = new EntryHandle[entries.length];
-
- // Set expiration before adding to tables
- for (int i=0; i<entries.length; i++) {
- final EntryRep entry = entries[i];
- leaseData[i] =
- grant(entry, leaseTimes[i], entryLeasePolicy, "entryLeasePolicy");
- holders[i] = contents.holderFor(entry);
- handles[i] = new EntryHandle(entry, txn, holders[i]);
- }
-
- // Verify that the transaction is still active. Lock it so
- // nobody can change it behind our backs while where making
- // this important determination. If there is no txn, just do it.
- try {
- if (txn != null)
- txn.ensureActive();
-
- // At this point nothing can go wrong, log this addition.
- if (log != null)
- log.writeOp(entries, (txn == null) ? null : txn.getId());
-
- for (int i=0; i<handles.length; i++) {
- synchronized (handles[i]) {
- addWrittenRep(handles[i], holders[i], txn);
- recordTransition(
- new EntryTransition(handles[i], txn, true, true, true));
- }
- }
- } finally {
- if (txn != null)
- txn.allowStateChange();
- }
-
- if (opsLogger.isLoggable(Level.FINEST)) {
- opsLogger.log(Level.FINEST, "writing multiples (txn = {0})",
- new Object[]{txn});
- }
-
- final long[] rslt = new long[entries.length * 3];
- for (int i=0; i<entries.length; i++) {
- rslt[i] = leaseData[i].duration;
- rslt[i+1] = entries[i].id().getMostSignificantBits();
- rslt[i+2] = entries[i].id().getLeastSignificantBits();
- }
- return rslt;
- }
-
- /**
- * Add the written rep to the relevant data structures. This is
- * the common code between <code>write</code> and
- * <code>recoverWrite</code>.
- *
- * @see #write
- * @see #recoverWrite
- */
- private void addWrittenRep(EntryHandle handle, EntryHolder holder,
- Txn txn)
- {
- opsLogger.log(Level.FINEST, "OutriggerServerImpl: addWrittenRep");
-
- /*
- * Since we're adding it, update the type information. Extra
- * type information isn't very critical, but having an object
- * in the system which cannot be matched by all its
- * superclasses is Just Plain Wrong. We do this first in case
- * of a crash between the two operations (We could do it all
- * in one synchronized wad, but that seems unnecessary,
- * considering the small overhead of extra type info that will
- * likely reappear as the entry that failed to be successfully
- * added due to the crash gets added again later).
- */
- types.addTypes(handle.rep());
-
- /*
- * Now we can add the entry, safe in the knowledge that it can be
- * found by any of its superclasses
- */
- holder.add(handle, txn);
- }
-
- /**
- * Records a transition in the visibility of
- * an entry. This method should be called <em>after</em>
- * the transition has been made visible in
- * <code>contents</code> (including any subsidiary
- * objects such as the appropriate <code>EntryHandle</code>).
- * <p>
- * Currently we only post transitions that increase the visibility
- * of an entry, or those that resolve a lock. We don't post
- * transitions that reflect the locking of an entry or straight
- * takes. Some of the watchers exploit this fact, so if we ever
- * start to post transitions take and/or new lock transition
- * the watcher may need to be updated.
- *
- * @param transition an object describing the visibility
- * transition of an entry.
- * @throws NullPointerException if <code>transition</code>
- * is <code>null</code> or if <code>transition.getEntry</code>
- * is <code>null</code>. */
- void recordTransition(EntryTransition transition) {
- operationJournal.recordTransition(transition);
- }
-
- /**
- * Queue up an event for delivery.
- * @param sender An object that on request will
- * attempt to deliver its event
- * to the associated listener.
- * @throws NullPointerException if <code>sender</code> is
- * <code>null</code>
- */
- void enqueueDelivery(EventSender sender) {
- notifier.enqueueDelivery(sender);
- }
-
- /**
- * Atomically check to see if the passed entry can be read/taken by
- * the specified operation using the specified transaction and if
- * it can read/take it and return <code>true</code>, otherwise
- * return <code>false</code>. If the entry can not be read/taken
- * because of transaction conflicts the conflicting transaction(s)
- * will be added to the list of transactions to monitor.
- *
- * @param handle The <code>EntryHandle</code> of the entry
- * the caller wants to read/take.
- * @param txn If non-null the transaction to perform
- * this operation under. Note, if non-null and this
- * is not active <code>false</code> will be returned.
- * @param takeIt <code>true</code> if the caller is trying
- * take the passed entry, <code>false</code>
- * otherwise.
- * @param lockedEntrySet If the entry can not be read/taken
- * because of a transaction conflict and the value of
- * this argument is non-null, the ID of the entry
- * will be added to this set. This method assumes
- * that any concurrent access to the set is being
- * arbitrated by the set or by the caller of this method.
- * @param provisionallyRemovedEntrySet If the entry can not be
- * read/taken because it has been provisionally
- * removed then its handle will be placed in the
- * passed <code>WeakHashMap</code> as a key (with
- * null as the value). May be <code>null</code> in
- * which case provisionally removed entries will not
- * be recorded. This method assumes that any
- * concurrent access is being arbitrated by the set
- * or by the caller.
- * @param now an estimate of the current time in milliseconds
- * since the beginning of the epoch.
- * @param watcher The <code>QueryWatcher</code> requesting
- * capture of the entry.
- * @return <code>true</code> if the entry could be read/taken and
- * <code>false</code> otherwise.
- * @throws NullPointerException if entry is <code>null</code>.
- */
- boolean attemptCapture(EntryHandle handle, TransactableMgr txn,
- boolean takeIt, Set lockedEntrySet,
- WeakHashMap provisionallyRemovedEntrySet, long now,
- QueryWatcher watcher)
- {
- final EntryHolder holder = contents.holderFor(handle.rep());
- final Set conflictSet = new java.util.HashSet();
- if (holder.attemptCapture(handle, txn, takeIt,
- conflictSet, lockedEntrySet,
- provisionallyRemovedEntrySet, now))
- return true;
-
- monitor(watcher, conflictSet);
- return false;
- }
-
- // purposefully inherit doc comment from supertype
- public EventRegistration
- notify(EntryRep tmpl, Transaction tr, RemoteEventListener listener,
- long leaseTime, MarshalledObject handback)
- throws TransactionException, RemoteException
- {
- opsLogger.entering("OutriggerServerImpl", "notify");
-
- typeCheck(tmpl);
-
- checkForNull(listener, "Passed null listener for event registration");
-
- listener =
- (RemoteEventListener)listenerPreparer.prepareProxy(listener);
-
- final long currentOrdinal = operationJournal.currentOrdinal();
-
- // Register a watcher for this client
- tmpl = setupTmpl(tmpl);
-
- ServerTransaction str = serverTransaction(tr);
- Txn txn = enterTxn(tr);
-
- final Uuid cookie = UuidFactory.generate();
- final long eventID = nextID();
- final long now = System.currentTimeMillis();
-
- final EventRegistrationWatcher reg;
- if (txn == null) {
- reg = new StorableEventWatcher(now, currentOrdinal, cookie,
- handback, eventID, listener);
- } else {
- reg = new TransactableEventWatcher(now, currentOrdinal, cookie,
- handback, eventID, listener, txn);
- }
-
- // Get the expiration times
- grant(reg, leaseTime, eventLeasePolicy, "eventLeasePolicy");
-
- /* Add to eventRegistrations here so reg will be in the map
- * before it can ever try and remove itself.
- */
- eventRegistrations.put(cookie, reg);
-
- /* Prevent transaction state change while we do this.
- * If there is no txn, just do it.
- */
- if (txn != null) {
- try {
- txn.ensureActive();
- templates.add(reg, tmpl);
- txn.add((Transactable)reg);
- } finally {
- txn.allowStateChange();
- }
- } else {
- // log before adding to templates
- if (log != null)
- log.registerOp((StorableResource)reg,
- "StorableEventWatcher",
- new StorableObject[]{tmpl});
-
- templates.add(reg, tmpl);
- }
-
- return new EventRegistration(eventID, spaceProxy,
- leaseFactory.newLease(cookie, reg.getExpiration()),
- 0);
- }
-
-
- public EventRegistration registerForAvailabilityEvent(EntryRep[] tmpls,
- Transaction tr, boolean visibilityOnly, RemoteEventListener listener,
- long leaseTime, MarshalledObject handback)
- throws TransactionException, RemoteException
- {
- opsLogger.entering("OutriggerServerImpl", "registeForAvailabilityEvent");
-
- checkForNull(listener, "Passed null listener for event registration");
- checkForEmpty(tmpls, "Must provide at least one template");
-
- // eventLeasePolicy.grant allo ws 0 length leases
- if (leaseTime == 0) {
- throw logAndThrowIllegalArg("leaseTime must be non-zero");
- }
-
- listener =
- (RemoteEventListener)listenerPreparer.prepareProxy(listener);
- final long currentOrdinal = operationJournal.currentOrdinal();
-
- ServerTransaction str = serverTransaction(tr);
- Txn txn = enterTxn(tr);
-
- for (int i=0; i<tmpls.length; i++) {
- typeCheck(tmpls[i]);
- }
-
- for (int i=0; i<tmpls.length; i++) {
- tmpls[i] = setupTmpl(tmpls[i]);
- }
-
- final Uuid cookie = UuidFactory.generate();
- final long eventID = nextID();
- final long now = System.currentTimeMillis();
-
- final AvailabilityRegistrationWatcher reg;
- if (txn == null) {
- reg = new StorableAvailabilityWatcher(now, currentOrdinal, cookie,
- visibilityOnly, handback, eventID, listener);
- } else {
- reg = new TransactableAvailabilityWatcher(now, currentOrdinal,
- cookie, visibilityOnly, handback, eventID, listener, txn);
- }
-
- // Get the expiration time
- grant(reg, leaseTime, eventLeasePolicy, "eventLeasePolicy");
-
- /* Add to eventRegistrations here so reg will be in the map
- * before it can ever try and remove itself.
- */
- eventRegistrations.put(cookie, reg);
-
- if (txn != null) {
- try {
- txn.ensureActive();
- for (int i=0; i<tmpls.length; i++) {
- templates.add(reg, tmpls[i]);
- }
- txn.add((Transactable)reg);
- } finally {
- txn.allowStateChange();
- }
- } else {
- // log before adding to templates
- if (log != null)
- log.registerOp((StorableResource)reg,
- "StorableAvailabilityWatcher",
- tmpls);
-
- for (int i=0; i<tmpls.length; i++) {
- templates.add(reg, tmpls[i]);
- }
- }
-
- return new EventRegistration(eventID, spaceProxy,
- leaseFactory.newLease(cookie, reg.getExpiration()),
- 0);
- }
-
-
- /**
- * Remove the passed <code>EventRegistrationRecord</code> from
- * event registrations map.
- * @param reg The <code>EventRegistrationRecord</code> object to remove.
- * @throws NullPointerException if <code>reg</code> is <code>null</code>.
- */
- void removeEventRegistration(EventRegistrationRecord reg) {
- eventRegistrations.remove(reg.getCookie());
- }
-
- private EntryRep setupTmpl(EntryRep tmpl) {
- /*
- * If the notification is for a null template, use the special
- * matchAnyEntryRep. When the notification system adds a
- * record for this EntryRep, any time any incoming EntryRep
- * enters the space, the notification system will
- * successfully match it and send the notification.
- */
- if (tmpl == null)
- tmpl = EntryRep.matchAnyEntryRep();
-
- /*
- * This entry may be a new type if the notification request
- * arrives before any entry of the type is written.
- */
- types.addTypes(tmpl);
-
- return tmpl;
- }
-
- // purposefully inherit doc comment from supertype
- public void cancel(Uuid cookie) throws UnknownLeaseException {
- leaseLogger.entering("OutriggerServerImpl","cancel");
-
- // Look for entry leases first
- final EntryHandle handle = contents.handleFor(cookie);
- if (handle != null) {
- synchronized (handle) {
- if (handle.removed())
- throw throwNewUnknownLeaseException(cookie);
-
- if (handle.isProvisionallyRemoved()) {
- try {
- handle.waitOnCompleteRemoval();
- } catch (InterruptedException e) {
- // should never happen
- throw new AssertionError(e);
- }
- throw throwNewUnknownLeaseException(cookie);
- }
- handle.provisionallyRemove();
- }
-
- cancelOp(cookie, false);
-
- synchronized (handle) {
- contents.remove(handle);
- }
-
- return;
- }
-
-
- final EventRegistrationRecord reg =
- (EventRegistrationRecord)eventRegistrations.get(cookie);
- if (reg != null && reg.cancel()) {
- return;
- /* Think before adding anything here, if you do
- * other places where registrations get killed will not
- * do everything they need.
- */
- }
-
- final ContentsQuery contentsQuery =
- (ContentsQuery)contentsQueries.get(cookie);
- if (contentsQuery != null && contentsQuery.cancel()) {
- return;
- }
-
- throw throwNewUnknownLeaseException(cookie);
- }
-
- // purposefully inherit doc comment from supertype
- public long renew(Uuid cookie, long extension)
- throws UnknownLeaseException, LeaseDeniedException
- {
- leaseLogger.entering("OutriggerServerImpl","renew");
-
- LeasedResource resource;
- LeasePeriodPolicy policy;
-
- if (null != (resource = contents.getLeasedResource(cookie)))
- policy = entryLeasePolicy;
- else if (null !=
- (resource = (LeasedResource)eventRegistrations.get(cookie)))
- policy = eventLeasePolicy;
- else if (null !=
- (resource = (LeasedResource)contentsQueries.get(cookie)))
- policy = contentsLeasePolicy;
- else
- throw throwNewUnknownLeaseException(cookie);
-
- synchronized (resource) {
- if (resource.getExpiration() <= System.currentTimeMillis()) {
- // Lease has expired, don't renew
- throw throwNewUnknownLeaseException(cookie);
- }
-
- // No one can expire the lease, so it is safe to update.
- final LeasePeriodPolicy.Result r =
- policy.renew(resource, extension);
-
- if (log != null)
- log.renewOp((Uuid)cookie, r.expiration);
-
- resource.setExpiration(r.expiration);
-
- if (leaseLogger.isLoggable(Level.FINER)) {
- leaseLogger.log(Level.FINER, "renew({0},{1}) returns {2}",
- new Object[]{cookie, Long.valueOf(extension),
- Long.valueOf(r.duration)});
- }
-
- return r.duration;
- }
- }
-
- // purposefully inherit doc comment
- public Landlord.RenewResults renewAll(Uuid[] cookies,
- long[] extensions)
- {
- leaseLogger.entering("OutriggerServerImpl","renewAll");
-
- if (leaseLogger.isLoggable(Level.FINER)) {
- leaseLogger.log(Level.FINER, "renewAll:{0} leases",
- Long.valueOf(cookies.length));
- }
-
- return LandlordUtil.renewAll(this, cookies, extensions);
- }
-
- // purposefully inherit doc comment
- public Map cancelAll(Uuid[] cookies) {
- leaseLogger.entering("OutriggerServerImpl", "cancelAll");
- return LandlordUtil.cancelAll(this, cookies);
- }
-
-
- public Object read(EntryRep tmpl, Transaction txn, long timeout,
- QueryCookie cookie)
- throws TransactionException, RemoteException, InterruptedException
- {
- if (opsLogger.isLoggable(Level.FINER)) {
- opsLogger.log(Level.FINER,
- "read:tmpl = {0}, timeout = {1}, cookie = {2}",
- new Object[]{tmpl, Long.valueOf(timeout), cookie});
- }
- return getMatch(tmpl, txn, timeout, false, false, cookie);
- }
-
- public Object take(EntryRep tmpl, Transaction txn, long timeout,
- QueryCookie cookie)
- throws TransactionException, RemoteException, InterruptedException
- {
- if (opsLogger.isLoggable(Level.FINER)) {
- opsLogger.log(Level.FINER,
- "take:tmpl = {0}, timeout = {1}, cookie = {2}",
- new Object[]{tmpl, Long.valueOf(timeout), cookie});
- }
- return getMatch(tmpl, txn, timeout, true, false, cookie);
- }
-
- public Object readIfExists(EntryRep tmpl, Transaction txn, long timeout,
- QueryCookie cookie)
- throws TransactionException, RemoteException, InterruptedException
- {
- if (opsLogger.isLoggable(Level.FINER)) {
- opsLogger.log(Level.FINER,
- "readIfExists:tmpl = {0}, timeout = {1}, cookie = {2}",
- new Object[]{tmpl, Long.valueOf(timeout), cookie});
- }
- return getMatch(tmpl, txn, timeout, false, true, cookie);
- }
-
- public Object takeIfExists(EntryRep tmpl, Transaction txn, long timeout,
- QueryCookie cookie)
- throws TransactionException, RemoteException, InterruptedException
- {
- if (opsLogger.isLoggable(Level.FINER)) {
- opsLogger.log(Level.FINER,
- "takeIfExists:tmpl = {0}, timeout = {1}, cookie = {2}",
- new Object[]{tmpl, Long.valueOf(timeout), cookie});
- }
- return getMatch(tmpl, txn, timeout, true, true, cookie);
- }
-
- public Object take(EntryRep[] tmpls, Transaction tr, long timeout,
- int limit, QueryCookie queryCookieFromClient)
- throws TransactionException, RemoteException
- {
- if (opsLogger.isLoggable(Level.FINER)) {
- opsLogger.log(Level.FINER,
- "take<multiple>:timeout = {1}, limit{2} = cookie = {3}",
- new Object[]{Long.valueOf(timeout), Integer.valueOf(limit),
- queryCookieFromClient});
- }
-
- checkForEmpty(tmpls, "Must provide at least one template");
-
- for (int i=0; i<tmpls.length; i++) {
- typeCheck(tmpls[i]);
- if (tmpls[i] == null)
- tmpls[i] = EntryRep.matchAnyEntryRep();
- }
-
- checkLimit(limit);
- checkTimeout(timeout);
-
- ServerTransaction str = serverTransaction(tr);
- Txn txn = enterTxn(tr);
-
- /* Don't bother if transaction is not active,
- * synchronize so we get the most recent answer,
- * not because we need to make anything atomic.
- */
- if (txn != null) {
- synchronized(txn) {
- if (txn.getState() != ACTIVE)
- throw throwNewCannotJoinException();
- }
- }
-
- final long start = System.currentTimeMillis();
- final long endTime;
- if (Long.MAX_VALUE - timeout <= start)
- endTime = Long.MAX_VALUE; // If we would overflow, pin to MAX_VALUE
- else
- endTime = start + timeout;
-
- final OutriggerQueryCookie queryCookie;
- if (queryCookieFromClient == null ||
- !(queryCookieFromClient instanceof OutriggerQueryCookie))
- {
- queryCookie = new OutriggerQueryCookie(start);
- } else {
- queryCookie = (OutriggerQueryCookie)queryCookieFromClient;
- }
-
- /* This plugs a hole -- if an entry is written while we are
- * scanning the lists, it may arrive after we have passed that
- * point in the list. By capturing the last current element
- * operationJournal and searching forward from their after we
- * do the first traversal we can make sure that we get any
- * entries the initial search missed
- */
- final OperationJournal.TransitionIterator transitionIterator =
- operationJournal.newTransitionIterator();
-
- /* First we do a straight search of the entries currently in the space */
-
- // Set of classes we need to search
- final Set classes = new java.util.HashSet();
- for (int i=0; i<tmpls.length; i++) {
- final String whichClass = tmpls[i].classFor();
- final Iterator subtypes = types.subTypes(whichClass);
- while (subtypes.hasNext()) {
- classes.add(subtypes.next());
- }
- }
-
- limit = Math.min(limit, takeLimit);
- EntryHandle[] handles = new EntryHandle[limit];
- int found = 0;
- final Set conflictSet = new java.util.HashSet();
- final WeakHashMap provisionallyRemovedEntrySet = new WeakHashMap();
-
- for (Iterator i=classes.iterator();
- i.hasNext() && found < handles.length;)
- {
- final String clazz = (String)i.next();
- final EntryHolder.ContinuingQuery query =
- createQuery(tmpls, clazz, txn, true, start);
-
- if (query == null)
- continue;
-
- while (found < handles.length) {
- final EntryHandle handle =
- query.next(conflictSet, null, provisionallyRemovedEntrySet);
- if (handle == null)
- break;
- handles[found++] = handle;
- }
- }
-
- if (found > 0) {
- // We have something to return
- return completeTake(handles, found, txn);
- }
-
- final long time = System.currentTimeMillis();
-
- if (time >= endTime) {
- /* Even if this query won't block, check up on any
- * txns which prevented this query from returning reps
- */
- monitor(conflictSet);
-
- // need to return `null', but make sure conflicting takes have
- // completed
- try {
- waitOnProvisionallyRemovedEntries(provisionallyRemovedEntrySet);
- } catch (InterruptedException e) {
- // should never happen
- throw new AssertionError(e);
- }
-
- return queryCookie;
- }
-
- /* Now we look through the transitions that have occurred
- * since we started.
- *
- * First register a watcher.
- */
- final long startOrdinal =
- transitionIterator.currentOrdinalAtCreation();
- final TakeMultipleWatcher watcher = new TakeMultipleWatcher(limit, endTime,
- queryCookie.startTime, startOrdinal, provisionallyRemovedEntrySet,
- txn);
-
- /* If this query is under a transaction, make sure it still
- * active and add the watcher to the Txn. Do this before
- * anything else in case the transaction is no longer active.
- */
- if (txn != null) {
- synchronized(txn) {
- if (txn.getState() != ACTIVE)
- throw throwNewCannotJoinException();
-
- txn.add((Transactable)watcher);
- }
- }
-
- monitor(watcher, conflictSet);
- for (int i=0; i<tmpls.length; i++) {
- templates.add(watcher, tmpls[i]);
- }
-
- /* Now we need to search the operation journal for all
- * the transitions that were added after we started
- * our initial search and that were processed before
- * we added our watcher to templates.
- */
- transitionIterator.watcherRegistered();
-
- transitions:
- for (EntryTransition i = transitionIterator.next();
- i != null;
- i = transitionIterator.next())
- {
- final EntryRep rep = i.getHandle().rep();
-
- for (int j=0; j<tmpls.length; j++) {
- final EntryRep tmpl = tmpls[j];
- if (rep.isAtLeastA(tmpl.classFor()) && tmpl.matches(rep)) {
- // Match! - Need to process
- if (watcher.catchUp(i, time)) {
- break transitions;
- }
- }
- }
- }
-
-
- try {
- watcher.waitOnResolution();
- } catch (InterruptedException e) {
- // should never happen
- throw new AssertionError(e);
- }
-
- handles = watcher.resolvedWithEntries();
- if (handles != null) { // got some
- return completeTake(handles, handles.length, txn);
- }
-
- final Throwable t = watcher.resolvedWithThrowable();
- if (t != null) {
- if (opsLogger.isLoggable(Levels.FAILED))
- opsLogger.log(Levels.FAILED, t.getMessage(), t);
-
- if (t instanceof RemoteException)
- throw (RemoteException)t;
-
- if (t instanceof TransactionException)
- throw (TransactionException)t;
-
- if (t instanceof RuntimeException)
- throw (RuntimeException)t;
-
- if (t instanceof Error)
- throw (Error)t;
-
- throw new InternalSpaceException(
- "Query threw unexpected exception", t);
- }
-
- /* Before returning nothing, make sure all pending removals
- * that we conflicted with have been logged.
- */
- try {
- waitOnProvisionallyRemovedEntries(provisionallyRemovedEntrySet);
- } catch (InterruptedException e) {
- // should never happen
- throw new AssertionError(e);
- }
-
- return queryCookie;
- }
-
- private EntryRep[] completeTake(EntryHandle[] handles, int found, Txn txn)
- throws TransactionException
- {
- final EntryRep[] reps = new EntryRep[found];
-
- if (log == null) {
- for (int i=0; i<found; i++) {
- reps[i] = handles[i].rep();
- }
- } else {
- final Uuid[] uuids = new Uuid[found];
- for (int i=0; i<found; i++) {
- final EntryRep rep = handles[i].rep();
- reps[i] = rep;
- uuids[i] = rep.id();
- }
-
- if (txn == null) {
- log.takeOp(uuids, null);
- } else {
- /* Make sure the txn is still active, we don't want
- * to write a transactional take recored after
- * writing out the prepare record.
- */
- try {
- txn.ensureActive();
- log.takeOp(uuids, txn.getId());
- } finally {
- txn.allowStateChange();
- }
- }
- }
-
- // Now that takes have been committed to disk, let other queries go
- if (txn == null) {
- for (int i=0; i<found; i++) {
- synchronized (handles[i]) {
- contents.remove(handles[i]);
- }
- }
- }
-
- return reps;
- }
-
- private EntryRep completeTake(EntryHandle handle, Txn txn)
- throws TransactionException
- {
- final EntryRep rep = handle.rep();
-
- if (log != null) {
- if (txn == null) {
- log.takeOp(rep.id(), null);
- } else {
- /* Make sure the txn is still active, we don't want
- * to write a transactional take recored after
- * writing out the prepare record.
- */
- try {
- txn.ensureActive();
- log.takeOp(rep.id(), txn.getId());
- } finally {
- txn.allowStateChange();
- }
- }
- }
-
- // Now that takes have been committed to disk, let other queries go
- if (txn == null) {
- synchronized (handle) {
- contents.remove(handle);
- }
- }
-
- return rep;
- }
-
- /**
- * Crerate a ContinuingQuery for the holder of the specified class.
- */
- private EntryHolder.ContinuingQuery createQuery(EntryRep[] tmpls,
- String clazz, Txn txn, boolean takeIt, long now)
- {
- final EntryHolder holder = contents.holderFor(clazz);
- final String[] supertypes = holder.supertypes();
-
- if (supertypes == null)
- return null;
-
- final List tmplsToCheck = new java.util.LinkedList();
- for (int i=0; i<tmpls.length; i++) {
- final EntryRep tmpl = tmpls[i];
- final String tmplClass = tmpl.classFor();
- if (tmplClass.equals(clazz) || tmpl == EntryRep.matchAnyEntryRep()) {
- tmplsToCheck.add(tmpl);
- } else {
- for (int j=0; j<supertypes.length; j++) {
- if (tmplClass.equals(supertypes[j])) {
- tmplsToCheck.add(tmpl);
- break;
- }
- }
- }
- }
-
- return holder.continuingQuery(
- (EntryRep[])tmplsToCheck.toArray(new EntryRep[tmplsToCheck.size()]),
- txn, takeIt, now);
- }
-
-
- /**
- * Call <code>waitOnCompleteRemoval</code> on each of the EntryHandles
- * in the passed set.
- */
- private static void waitOnProvisionallyRemovedEntries(
- WeakHashMap provisionallyRemovedEntrySet)
- throws InterruptedException
- {
- if (provisionallyRemovedEntrySet.isEmpty())
- return;
-
- final Set keys = provisionallyRemovedEntrySet.keySet();
-
- for (Iterator i=keys.iterator(); i.hasNext();) {
- final EntryHandle handle = (EntryHandle)i.next();
- if (handle == null)
- continue;
- synchronized (handle) {
- handle.waitOnCompleteRemoval();
- }
- }
- }
-
-
- /**
- * Do the heavy lifting for queries. Find a match,
- * optionally taking it, blocking as appropriate
- * if a match can't be initially found.
- * @param tmpl The template for the query, may be <code>null</code>
- * if all entries match.
- * @param tr The transaction the query is being performed under,
- * or <code>null</code> if there is no transaction.
- * @param timeout Maxium time to block in milliseconds.
- * @param takeIt <code>true</code> if the entry found is
- * to be removed.
- * @param ifExists <code>true</code> if this query is to follow
- * the rules for ifExists queries.
- * @param queryCookieFromClient If this call is a continuation of
- * an earlier query, the cookie from the
- * last sub-query.
- * @throws RemoteException if a network failure occurs.
- * @throws TransactionException if there is a problem
- * with the specified transaction such as
- * it can not be joined, or leaves the active
- * state before the call is complete.
- * @throws InterruptedException if the thread in the server
- * is interrupted before the query can be completed.
- * @throws SecurityException if the server decides
- * the caller has insufficient privilege to carry
- * out the operation.
- * @throws IllegalArgumentException if a negative timeout value is used
- * @throws InternalSpaceException if there is an internal problem
- * with the server.
- */
- private Object
- getMatch(EntryRep tmpl, Transaction tr, long timeout, boolean takeIt,
- boolean ifExists, QueryCookie queryCookieFromClient)
- throws RemoteException, InterruptedException, TransactionException
- {
- typeCheck(tmpl);
- checkTimeout(timeout);
-
- // after this time, can stop looking
- final long startTime = System.currentTimeMillis();
- final long endTime; // when should we stop?
- if (Long.MAX_VALUE - timeout <= startTime)
- endTime = Long.MAX_VALUE; // If we would overflow, pin to MAX_VALUE
- else
- endTime = startTime + timeout;
-
- // We get the txn object here, but we'll not check the validity
- // of the transaction state until later...
-
- Txn txn = enterTxn(tr);
-
- // Take a quick peek to see if the transaction has already closed.
- // If it has, then bail. NB: We must do this check again when we've
- // found a candidate match (much deeper in the code).
-
- if (txn != null) {
- synchronized(txn) {
- if (txn.getState() != ACTIVE)
- throw throwNewCannotJoinException();
- }
- }
-
- /* This plugs a hole -- if an entry is written while getMatch
- * is scanning the list, it may arrive after getMatch has
- * passed that point in the list. By capturing the last
- * current element operationJournal and searching forward from
- * there after we do the first traversal, we can make sure that
- * we get any entries the initial search missed
- */
- final OperationJournal.TransitionIterator transitionIterator =
- operationJournal.newTransitionIterator();
-
- // We use a distinguished EntryRep for the null template
- if (tmpl == null)
- tmpl = EntryRep.matchAnyEntryRep();
-
- EntryHandle handle = null;
- final Set conflictSet = new java.util.HashSet();
- final Set lockedEntrySet =
- (ifExists?new java.util.HashSet():null);
- final WeakHashMap provisionallyRemovedEntrySet =
- new java.util.WeakHashMap();
-
- /*
- * First we do the straight search
- */
- handle = find(tmpl, txn, takeIt, conflictSet, lockedEntrySet,
- provisionallyRemovedEntrySet);
- opsLogger.log(Level.FINEST, "getMatch, initial search found {0}", handle);
-
- if (handle != null) { // found it
- if (takeIt)
- return completeTake(handle, txn);
- else
- return handle.rep();
- }
-
- if (opsLogger.isLoggable(Level.FINEST)) {
- opsLogger.log(Level.FINEST, "{0} conflicts, endTime = {1}",
- new Object[] {Integer.valueOf(conflictSet.size()),
- Long.valueOf(endTime)});
- }
-
- final OutriggerQueryCookie queryCookie;
- if (queryCookieFromClient == null ||
- !(queryCookieFromClient instanceof OutriggerQueryCookie))
- {
- queryCookie = new OutriggerQueryCookie(startTime);
- } else {
- queryCookie = (OutriggerQueryCookie)queryCookieFromClient;
- }
-
-
- final long time = System.currentTimeMillis();
- if (time >= endTime) {
- /* Even if this query won't block, check up on any
- * txns which prevented this query from returning a rep
- */
- monitor(conflictSet);
- // Make sure the removal of any provisionally removed
- // entries are committed to disk before failing the query
- waitOnProvisionallyRemovedEntries(provisionallyRemovedEntrySet);
- return queryCookie;
- }
-
- /* Even if there are no conflicts and this is an ifExists
- * query we still need to go through the transitions that
- * occurred after the initial search started. This is because
- * we allow transitions to occur during the initial search and
- * because the initial search is does not process the current
- * state in the order it was created. For example there could
- * be a match in holder 2, but we search on holder 1 first
- * where there is no match. before we get to holder 2 a match
- * is added to holder 1 and the matching entry is removed from
- * holder 2. During this time there was always at least one
- * match in the space, but this initial search will turn up 0.
- *
- * By waiting until we go through the transition queue before
- * deciding there are no matches we know that any match that
- * was in the space when started the search, removed before we
- * got to it and was replaced we will see the
- * replacement. Once we get caught up we process the transitions
- * in their natural order.
- */
-
- /* Now we look through the transitions that have occurred
- * since we started.
- *
- * First register a watcher.
- */
- final SingletonQueryWatcher watcher;
- final long startOrdinal =
- transitionIterator.currentOrdinalAtCreation();
- if (!ifExists && !takeIt && txn == null) {
- watcher =
- new ReadWatcher(endTime, queryCookie.startTime,
- startOrdinal);
- } else if (ifExists && !takeIt) {
- if (txn == null) {
- watcher = new ReadIfExistsWatcher(endTime, queryCookie.startTime,
- startOrdinal, lockedEntrySet);
- } else {
- watcher = new TransactableReadIfExistsWatcher(endTime,
- queryCookie.startTime, startOrdinal, lockedEntrySet,
- provisionallyRemovedEntrySet, txn);
- }
- } else if (!ifExists && (takeIt || txn != null)) {
- watcher = new ConsumingWatcher(endTime, queryCookie.startTime,
- startOrdinal, provisionallyRemovedEntrySet, txn, takeIt);
- } else if (ifExists && takeIt) {
- watcher = new TakeIfExistsWatcher(endTime,
- queryCookie.startTime, startOrdinal, lockedEntrySet,
- provisionallyRemovedEntrySet, txn);
- } else {
- throw new AssertionError("Can't create watcher for query");
- }
-
- /* If this query is under a transaction, make sure it still
- * active and add the watcher to the Txn. Do this before
- * anything else in case the transaction is no longer active.
- */
- if (txn != null) {
- synchronized(txn) {
- if (txn.getState() != ACTIVE)
- throw throwNewCannotJoinException();
-
- txn.add((Transactable)watcher);
- }
- }
-
- monitor(watcher, conflictSet);
-
- templates.add(watcher, tmpl);
-
- /* Now we need to search the operation journal for all
- * the transitions that were added after we started
- * our initial search and that were processed before
- * we added our watcher to templates.
- */
- transitionIterator.watcherRegistered();
- final String tmplClass = tmpl.classFor();
-
- for (EntryTransition i = transitionIterator.next();
- i != null;
- i = transitionIterator.next())
- {
- final EntryRep rep = i.getHandle().rep();
- if (rep.isAtLeastA(tmplClass) && tmpl.matches(rep)) {
- // Match! - Need to process
- if (watcher.catchUp(i, time)) {
- break;
- }
- }
- }
-
- /* If this is an ifExists query need to mark in the journal
- * that once the current tail of journal is processed an empty
- * locked entry set means we should return.
- */
- if (ifExists)
- operationJournal.markCaughtUp((IfExistsWatcher)watcher);
-
- watcher.waitOnResolution();
- handle = watcher.resolvedWithEntry();
- if (handle != null) { // got one
- if (takeIt)
- return completeTake(handle, txn);
- else
- return handle.rep();
- }
-
- final Throwable t = watcher.resolvedWithThrowable();
- if (t != null) {
- if (opsLogger.isLoggable(Levels.FAILED))
- opsLogger.log(Levels.FAILED, t.getMessage(), t);
-
- if (t instanceof RemoteException)
- throw (RemoteException)t;
-
- if (t instanceof InterruptedException)
- throw (InterruptedException)t;
-
- if (t instanceof TransactionException)
- throw (TransactionException)t;
-
- if (t instanceof RuntimeException)
- throw (RuntimeException)t;
-
- if (t instanceof Error)
- throw (Error)t;
-
- throw new InternalSpaceException(
- "Query threw unexpected exception", t);
- }
-
- // Before returning nothing, make sure all pending removal
- // have been logged.
- waitOnProvisionallyRemovedEntries(provisionallyRemovedEntrySet);
- if (ifExists && ((IfExistsWatcher)watcher).isLockedEntrySetEmpty())
- return null;
-
- return queryCookie;
- }
-
- /**
- * Make sure the transactions listed here are monitored for as
- * long as the given query exists.
- */
- private void monitor(QueryWatcher watcher, Collection toMonitor) {
- if (!toMonitor.isEmpty())
- txnMonitor.add(watcher, toMonitor);
- }
-
- /**
- * Make sure the transactions listed here are monitored for
- * a reasonable amount of time since they recently caused a conflict,
- * although for a non-leased event.
- */
- void monitor(Collection toMonitor) {
- if (!toMonitor.isEmpty())
- txnMonitor.add(toMonitor);
- }
-
- /**
- * Debug method: Dump out the bucket for the holder for the given
- * rep.
- */
- void dump(String name, EntryRep rep) {
- dump(contents.holderFor(rep), name, rep);
- }
-
- /**
- * Debug method: Dump out the bucket for the given holder, for an
- * operation using the given rep.
- */
- static void dump(EntryHolder holder, String name, EntryRep rep) {
- try {
- holder.dump(name + " " + rep.entry());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * Find an entry that is at least <code>whichClass</code> that
- * matches the template <code>tmplRep</code> (at least the
- * template's type, and matches in values provided).
- * <code>whichClass</code> is at least the type of the template.
- * If <code>takeIt</code> is <code>true</code> the matching entry
- * is removed (perhaps provisionally).
- */
- private EntryHandle
- find(EntryRep tmplRep, Txn txn, boolean takeIt, Set conflictSet,
- Set lockedEntrySet, WeakHashMap provisionallyRemovedEntrySet)
- throws TransactionException
- {
- final String whichClass = tmplRep.classFor();
-
- /*
- * The iterator returned by subTypes includes both this class
- * and all subtypes.
- */
- Iterator subtypes = types.subTypes(whichClass);
- String className = null;
- EntryHandle result = null;
- boolean foundConflicts = false;
- EntryHolder holder = null;
-
- while (subtypes.hasNext()) {
- className = (String) subtypes.next();
- opsLogger.log(Level.FINEST,
- "OutriggerServerImpl: find: className = {0}", className);
-
- holder = contents.holderFor(className);
- result = holder.hasMatch(tmplRep, txn, takeIt, conflictSet,
- lockedEntrySet, provisionallyRemovedEntrySet);
- if (result != null) {
- return result;
- }
- }
-
- // no result found
- return null;
- }
-
- /**
- * Generate a new ID. IDs are generated from a
- * <code>SecureRandom</code> object so that it is next to
- * impossible to forge an ID and so we don't have
- * to remember/restore a counter cross restarts.
- */
- static long nextID() {
- return idGen.nextLong();
- }
-
- // ------------------------------------------------------------
- // Contents method and related classes
- // ------------------------------------------------------------
- public MatchSetData contents(EntryRep[] tmpls, Transaction tr,
- long leaseTime, long limit)
- throws TransactionException, RemoteException
- {
- if (opsLogger.isLoggable(Level.FINER)) {
- opsLogger.log(Level.FINER,
- "contents:tmpls = {0}, tr = {1}, leaseTime = {2}, " +
- "limit = {3}",
- new Object[]{tmpls, tr, Long.valueOf(leaseTime), Long.valueOf(limit)});
- }
-
- checkForEmpty(tmpls, "Must provide at least one template");
- checkLimit(limit);
-
- for (int i=0; i<tmpls.length; i++) {
- typeCheck(tmpls[i]);
- if (tmpls[i] == null)
- tmpls[i] = EntryRep.matchAnyEntryRep();
- }
-
- /* Explicit check of leastTime, contentsLeasePolicy.grant
- * won't check for leaseTime == 0, and we don't call grant at
- * all if we decide we don't need a lease.
- */
- if (leaseTime < 1 && leaseTime != Lease.ANY) {
- throw logAndThrowIllegalArg(
- "leaseTime = " + leaseTime + ", must be postive or Lease.ANY");
- }
-
- ServerTransaction str = serverTransaction(tr);
- Txn txn = enterTxn(tr);
-
- /* Don't bother if transaction is not active,
- * synchronize so we get the most recent answer,
- * not because we need to make anything atomic.
- */
- if (txn != null) {
- synchronized(txn) {
- if (txn.getState() != ACTIVE)
- throw throwNewCannotJoinException();
- }
- }
-
- final Uuid uuid = UuidFactory.generate();
[... 5547 lines stripped ...]