You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@river.apache.org by Peter Firmstone <ji...@zeus.net.au> on 2013/04/22 13:02:21 UTC

Javaspace, RemoteEvent ordering and RetryTask.runAfter

I've been scouring code for some time now, fixing thread visibility 
issues, no matter how insignificant it appears.

There are random test failures that relate to not receiving an expected 
number of events, or a leased resource that's still available after 
expiry (event related), if you wait about 20 seconds after lease expiry, 
the test always passes, but it doesn't always pass if the check delay is 
only 1 second.

These test failures don't appear to be a thread concurrency or 
synchronization issue, I'm just about sure of that now.

Look at the following two classes, does this look in any way 
idempotent?  No?

How do you synchronize over a network?

Shouldn't it be the responsibility of the recipient to ensure ordering?  
Hello, I'm missing a sequence number for an event?  What happens if the 
events arrive out of order?

Yes, this does appear to relate to RetryTask.runAfter.

You know what I'm thinking?

I'd like to release River without a JavaSpaces implementation, I mean, 
just get it out the door, we can fix and release Outrigger later.  Only 
Outrigger related tests are failing.

I was just looking through Blitz, looks a lot cleaner than Outrigger.  Dan?

Peter.

package com.sun.jini.outrigger;

import java.io.IOException;
import java.rmi.RemoteException;
import net.jini.core.event.UnknownEventException;
import net.jini.security.ProxyPreparer;
import net.jini.space.JavaSpace;

/**
  * <code>EventSender</code>s encapsulates a remote event listener, a
  * handback, an event sequence number, and an event type (an event ID
  * and class of <code>RemoteEvent</code>). <code>EventSender</code>s
  * provide a method that attempts to deliver an event of the
  * encapsulated type with the encapsulated handback and sequence number
  * to the encapsulated listener.
  */
interface EventSender {
     /**
      * Send a remote event to the encapsulated listener of the encapsulated
      * type, with the encapsulated handback, sequence number.  No locks
      * should be held while calling the listener. This method may be called
      * more than once if all previous tries failed. This call may do
      * nothing and return normally if it is determined that delivering the
      * event is no longer useful. It is assumed that once this
      * method returns normally it will not be called again.
      *
      * @param source the source the event object
      *        sent to the lister should have.
      * @param now The current time.
      * @param preparer to apply to the listener if it has
      *        been recovered from a store and not yet re-prepared
      *        in this VM.
      * @throws IOException if the listener can not
      *         be unmarshalled. May throw {@link RemoteException}
      *         if the call to the listener or preparer does
      * @throws ClassNotFoundException if the listener
      *         needs to be unmarshalled and a necessary
      *         class can not be found.
      * @throws UnknownEventException if the
      *         call to the listener does. Note, this
      *         will not cause the watcher to remove itself.
      * @throws SecurityException if the call to the listener does
      *         or if the listener needs to be prepared and
      *         the <code>prepareProxy</code> call does.
      * @throws RuntimeException if the call to the listener does.
      */
     public void sendEvent(JavaSpace source, long now, ProxyPreparer 
preparer)
     throws UnknownEventException, ClassNotFoundException, IOException;

     /**
      * Called when the event sending infrastructure decides
      * to give up on the event registration associated with
      * this sender.
      */
     public void cancelRegistration();

     /**
      * Return <code>true</code> if the passed <code>EventSender</code>
      * should run before this one, otherwise return <code>false</code>.
      * @param other the sender this object should compare itself too.
      * @return <code>true</code> if this object should run after
      * <code>other</code>.
      */
     public boolean runAfter(EventSender other);
}


package com.sun.jini.outrigger;

import java.io.IOException;
import java.rmi.RemoteException;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.jini.core.event.UnknownEventException;
import net.jini.config.Configuration;
import net.jini.config.ConfigurationException;
import net.jini.security.ProxyPreparer;
import net.jini.space.JavaSpace;

import com.sun.jini.constants.ThrowableConstants;
import com.sun.jini.config.Config;
import com.sun.jini.logging.Levels;
import com.sun.jini.thread.TaskManager;
import com.sun.jini.thread.RetryTask;
import com.sun.jini.thread.WakeupManager;

/**
  * The notifier thread.  This thread is responsible for notifying
  * objects for which interest has been registered.  It operates in
  * transient space as much as possible.  Pending notifications will be
  * lost when the server goes down, but registrations of interest
  * survive across server crashes for persistent servers.
  *
  * @author Sun Microsystems, Inc.
  *
  * @see JavaSpace#notify
  * @see OutriggerServerImpl#notify
  */
// @see NotifyChit
class Notifier implements com.sun.jini.constants.TimeConstants {
     /**
      * The object to use for the <code>source</code> when creating
      * events.
      */
     private final JavaSpace source;

     /** Proxy preparer to use on recovered listeners */
     private final ProxyPreparer recoveredListenerPreparer;

     /** wakeup manager for <code>NotifyTask</code> */
     private final WakeupManager wakeupMgr =
     new WakeupManager(new WakeupManager.ThreadDesc(null, true));

     /** pending notifications tasks */
     private final TaskManager pending;

     private final static int    MAX_ATTEMPTS = 10;    // max times to retry

     /** Logger for logging event related information */
     private static final Logger logger =
     Logger.getLogger(OutriggerServerImpl.eventLoggerName);

     /**
      * Create a notifier connected to the given <code>space</code>.
      * @param source the value to use for the <code>source</code> in
      *               remote event objects.
      * @param recoveredListenerPreparer <code>ProxyPreparer</code> to
      *               apply to recovered listeners.
      * @param config a source of configuration data.a
      * @throws ConfigurationException if there is a problem
      *         with the passed configuration.
      * @throws NullPointerException if <code>source</code> or
      * <code>config</code> arguments are <code>null</code>.
      */
     Notifier(JavaSpace source, ProxyPreparer recoveredListenerPreparer,
          Configuration config)
     throws ConfigurationException
     {
     if (source == null)
         throw new NullPointerException("source must be non-null");
     this.source = source;

     this.recoveredListenerPreparer = recoveredListenerPreparer;

     pending = (TaskManager)Config.getNonNullEntry(config,
         OutriggerServerImpl.COMPONENT_NAME, "notificationsTaskManager",
         TaskManager.class, new TaskManager());
     }

     /**
      * Terminate the notifier, shutting down any threads
      * it has running. This method can assume that
      * the constructor completed.
      */
     void terminate() {
     pending.terminate();
     wakeupMgr.stop();
     wakeupMgr.cancelAll();
     }

     /**
      * Queue up an event for delivery.
      * @param sender An object that on request will
      *               attempt to deliver its event
      *               to the associated listener.
      * @throws NullPointerException if <code>sender</code> is
      * <code>null</code>
      */
     void enqueueDelivery(EventSender sender) {
     pending.add(new NotifyTask(sender));
     }

     /*
      * Static stuff for Pending (can't put it in the class, unfortunately).
      */
                 // 1 day =hrs  mins secs milliseconds
     private static final long    MAX_TIME = 1 * DAYS;
     private static final long    delays[] = {
                     1 * SECONDS, 5 * SECONDS,
                     10 * SECONDS, 60 * SECONDS, 60 * SECONDS
                 };

     static {
     /*
      * Make the delays the amount of time since the start -- it
      * is easier to declare the intervals, but the elapsed time is
      * more <i>useful</i>.
      */
     for (int i = 1; i < delays.length; i++)
         delays[i] += delays[i - 1];
     }

     /**
      * A task that represent a notification of matching a particular
      * template under a given transaction.
      */
     private class NotifyTask extends RetryTask {
     /** Who and what to send a event to. */
     private final EventSender sender;

     /**
      * Create an object to represent this list of chits needing
      * notification.
      * @param sender An object that on request will
      *               attempt to deliver its event
      *               to the associated listener.
      * @throws NullPointerException if <code>sender</code> is
      * <code>null</code>
      */
     NotifyTask(EventSender sender) {
         super(Notifier.this.pending, Notifier.this.wakeupMgr);
         if (sender == null)
         throw new NullPointerException("sender must be non-null");
         this.sender = sender;
     }

     /**
      * Try to notify the target.  Return <code>true</code> if the
      * notification was successful.
      * <p>
      * We know that we are the only one dealing with the given chit
      * because <code>runAfter</code> makes sure of it.
      */
     public boolean tryOnce() {
         long curTime = System.currentTimeMillis();
         if (curTime - startTime() > MAX_TIME) {
         if (logger.isLoggable(Levels.FAILED)) {
             logger.log(Levels.FAILED,
             "giving up on delivering event, keeping registration");
         }

         return true;    // just stop here, we are declaring "success"
         }

         boolean successful = true;    // notification successful?
         try {
         sender.sendEvent(source, curTime, recoveredListenerPreparer);
         } catch (UnknownEventException e) {
         // they didn't want to know about this, so stop them getting
         // future notifications, too.
         logFailure("UnknownEventException", Level.FINER, true, e);
         sender.cancelRegistration();
         // this is still "successful" -- we know to stop sending this
         } catch (RemoteException e) {
         final int cat = ThrowableConstants.retryable(e);

         if (cat == ThrowableConstants.BAD_INVOCATION ||
             cat == ThrowableConstants.BAD_OBJECT)
         {
             // Listener probably bad, retry likely to fail.
             logFailure("definite exception", Level.INFO, true, e);
             sender.cancelRegistration();
         } else if (cat == ThrowableConstants.INDEFINITE) {
             // try, try, again
             logFailure("indefinite exception", Levels.FAILED,
                    false, e);
             successful = false;
         } else if (cat == ThrowableConstants.UNCATEGORIZED) {
             // Same as above but log differently.
             logFailure("uncategorized exception", Level.INFO, false,
                    e);
             successful = false;
         } else {
             logger.log(Level.WARNING, "ThrowableConstants.retryable " +
             "returned out of range value, " + cat,
             new AssertionError(e));
             successful = false;
         }
         } catch (IOException e) {
         // corrupted listener? unlikely to get better, cancel
         logFailure("IOException", Level.INFO, true, e);
         sender.cancelRegistration();
         } catch (ClassNotFoundException e) {
         // probably a codebase problem, retry
         logFailure("ClassNotFoundException", Levels.FAILED, false, e);
         successful = false;
         } catch (RuntimeException e) {
         /* bad listener, or preparer, either way unlikely to
          * get better
          */
         logFailure("RuntimeException", Level.INFO, true, e);
         sender.cancelRegistration();
         }

         if (!successful && attempt() > MAX_ATTEMPTS) {
         if (logger.isLoggable(Levels.FAILED)) {
             logger.log(Levels.FAILED,
             "giving up on delivering event, keeping registration");
         }
         return true;        // as successful as we're going to be
         }

         return successful;
     }

     public boolean runAfter(java.util.List list, int max) {
         for (int i = 0; i < max; i++) {
         Object task = list.get(i);
         if (task instanceof NotifyTask) {
             NotifyTask nt = (NotifyTask)task;
             if (sender.runAfter(nt.sender))
             return true;
         }
         }
         return false;
     }

     /** Log a failed delivery attempt */
     private void logFailure(String exceptionDescription, Level level,
                 boolean terminal, Throwable t)
     {
         if (logger.isLoggable(level)) {
         logger.log(level, "Encountered " + exceptionDescription +
              "while preparing to send/sending event, " +
              (terminal?"dropping":"keeping") +  " registration", t);
         }
     }
     }
}