You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@felix.apache.org by fm...@apache.org on 2010/10/13 09:39:33 UTC

svn commit: r1022014 - in /felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl: Activator.java CoordinationImpl.java CoordinationMgr.java CoordinatorImpl.java

Author: fmeschbe
Date: Wed Oct 13 07:39:32 2010
New Revision: 1022014

URL: http://svn.apache.org/viewvc?rev=1022014&view=rev
Log:
FELIX-2647 Further improvements:
  * JavaDoc
  * Thread Safety
  * Improved timeout support
  * Improved support to ensure Participants only participate in a single coordination

Modified:
    felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java
    felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java
    felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java
    felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java

Modified: felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java
URL: http://svn.apache.org/viewvc/felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java?rev=1022014&r1=1022013&r2=1022014&view=diff
==============================================================================
--- felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java (original)
+++ felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/Activator.java Wed Oct 13 07:39:32 2010
@@ -20,24 +20,40 @@ package org.apache.felix.coordination.im
 
 import java.util.Hashtable;
 
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.felix.jmx.service.coordination.CoordinatorMBean;
 import org.apache.felix.service.coordination.Coordinator;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
+import org.osgi.util.tracker.ServiceTracker;
 
 @SuppressWarnings("deprecation")
 public class Activator implements BundleActivator {
 
     private CoordinationMgr mgr;
 
+    private ServiceTracker mbeanServerTracker;
+
     private ServiceRegistration coordinatorService;
 
-    public void start(BundleContext context) throws Exception {
+    public void start(BundleContext context) {
         mgr = new CoordinationMgr();
 
+        try {
+            mbeanServerTracker = new MBeanServerTracker(context, mgr);
+            mbeanServerTracker.open();
+        } catch (MalformedObjectNameException e) {
+            // TODO log
+        }
+
         ServiceFactory factory = new CoordinatorFactory(mgr);
         Hashtable<String, String> props = new Hashtable<String, String>();
         props.put(Constants.SERVICE_DESCRIPTION,
@@ -47,12 +63,17 @@ public class Activator implements Bundle
             Coordinator.class.getName(), factory, props);
     }
 
-    public void stop(BundleContext context) throws Exception {
+    public void stop(BundleContext context) {
         if (coordinatorService != null) {
             coordinatorService.unregister();
             coordinatorService = null;
         }
 
+        if (mbeanServerTracker != null) {
+            mbeanServerTracker.close();
+            mbeanServerTracker = null;
+        }
+
         mgr.cleanUp();
     }
 
@@ -74,4 +95,42 @@ public class Activator implements Bundle
         }
 
     }
+
+    static final class MBeanServerTracker extends ServiceTracker {
+
+        private final CoordinationMgr mgr;
+
+        private final ObjectName objectName;
+
+        MBeanServerTracker(final BundleContext context,
+                final CoordinationMgr mgr) throws MalformedObjectNameException {
+            super(context, MBeanServer.class.getName(), null);
+            this.mgr = mgr;
+            this.objectName = new ObjectName(CoordinatorMBean.OBJECTNAME);
+        }
+
+        @Override
+        public Object addingService(ServiceReference reference) {
+            MBeanServer server = (MBeanServer) super.addingService(reference);
+
+            try {
+                server.registerMBean(mgr, objectName);
+            } catch (Exception e) {
+                // TODO: log
+            }
+
+            return server;
+        }
+
+        @Override
+        public void removedService(ServiceReference reference, Object service) {
+            try {
+                ((MBeanServer) service).unregisterMBean(objectName);
+            } catch (Exception e) {
+                // TODO: log
+            }
+
+            super.removedService(reference, service);
+        }
+    }
 }

Modified: felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java
URL: http://svn.apache.org/viewvc/felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java?rev=1022014&r1=1022013&r2=1022014&view=diff
==============================================================================
--- felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java (original)
+++ felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationImpl.java Wed Oct 13 07:39:32 2010
@@ -20,8 +20,10 @@ package org.apache.felix.coordination.im
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TimerTask;
 
 import org.apache.felix.service.coordination.Coordination;
 import org.apache.felix.service.coordination.Participant;
@@ -43,6 +45,8 @@ public class CoordinationImpl implements
         FAILED;
     }
 
+    private final CoordinationMgr mgr;
+
     private final long id;
 
     private final String name;
@@ -50,22 +54,40 @@ public class CoordinationImpl implements
     // TODO: timeout must be enforced
     private long timeOutInMs;
 
-    private State state;
+    /**
+     * Access to this field must be synchronized as long as the expected state
+     * is {@link State#ACTIVE}. Once the state has changed, further updates to
+     * this instance will not take place any more and the state will only be
+     * modified by the thread successfully setting the state to
+     * {@link State#TERMINATING}.
+     */
+    private volatile State state;
 
-    private boolean mustFail;
+    private int mustFail;
 
-    private boolean timeout;
+    private Throwable failReason;
 
     private ArrayList<Participant> participants;
 
     private HashMap<Class<?>, Object> variables;
 
-    public CoordinationImpl(final long id, final String name) {
+    private TimerTask timeoutTask;
+
+    private Thread initiatorThread;
+
+    public CoordinationImpl(final CoordinationMgr mgr, final long id,
+            final String name, final long defaultTimeOutInMs) {
+        this.mgr = mgr;
         this.id = id;
         this.name = name;
+        this.mustFail = 0;
         this.state = State.ACTIVE;
         this.participants = new ArrayList<Participant>();
         this.variables = new HashMap<Class<?>, Object>();
+        this.timeOutInMs = -defaultTimeOutInMs;
+        this.initiatorThread = Thread.currentThread();
+
+        scheduleTimeout(defaultTimeOutInMs);
     }
 
     public String getName() {
@@ -76,12 +98,35 @@ public class CoordinationImpl implements
         return this.id;
     }
 
-    void mustFail() {
-        this.mustFail = true;
+    void mustFail(final Throwable reason) {
+        this.mustFail = FAILED;
+        this.failReason = reason;
     }
 
+    /**
+     * Initiates a coordination timeout. Called from the timer task scheduled by
+     * the {@link #scheduleTimeout(long)} method.
+     * <p>
+     * This method is inteded to only be called from the scheduled timer task.
+     */
     void timeout() {
-        this.timeout = true;
+        // If a timeout happens, the coordination thread is set to always fail
+        this.mustFail = TIMEOUT;
+
+        // and interrupted and a small delay happens to allow the initiator to
+        // clean up by reacting on the interrupt. If the initiator can do this
+        // clean up normally, the end() method will return TIMEOUT.
+        try {
+            initiatorThread.interrupt();
+            Thread.sleep(500); // half a second for now
+        } catch (SecurityException se) {
+            // thrown by interrupt -- no need to wait if interrupt fails
+        } catch (InterruptedException ie) {
+            // someone interrupted us while delaying, just continue
+        }
+
+        // After this delay the coordination is forcefully failed.
+        CoordinationImpl.this.fail(null);
     }
 
     long getTimeOut() {
@@ -89,24 +134,12 @@ public class CoordinationImpl implements
     }
 
     public int end() throws IllegalStateException {
-        if (state == State.ACTIVE) {
-            int reason = OK;
-            if (mustFail || timeout) {
-                fail(new Exception());
-                reason = mustFail ? FAILED : TIMEOUT;
-            } else {
-                state = State.TERMINATING;
-                for (Participant part : participants) {
-                    try {
-                        part.ended(this);
-                    } catch (Exception e) {
-                        // TODO: log
-                        reason = PARTIALLY_ENDED;
-                    }
-                }
-                state = State.TERMINATED;
+        if (startTermination()) {
+            if (mustFail != 0) {
+                failInternal();
+                return mustFail;
             }
-            return reason;
+            return endInternal();
         }
 
         // already terminated
@@ -114,16 +147,9 @@ public class CoordinationImpl implements
     }
 
     public boolean fail(Throwable reason) {
-        if (state == State.ACTIVE) {
-            state = State.TERMINATING;
-            for (Participant part : participants) {
-                try {
-                    part.failed(this);
-                } catch (Exception e) {
-                    // TODO: log
-                }
-            }
-            state = State.FAILED;
+        if (startTermination()) {
+            this.failReason = reason;
+            failInternal();
             return true;
         }
         return false;
@@ -131,36 +157,182 @@ public class CoordinationImpl implements
 
     public boolean terminate() {
         if (state == State.ACTIVE) {
-            end();
-            return true;
+            try {
+                end();
+                return true;
+            } catch (IllegalStateException ise) {
+                // another thread might have started the termination just
+                // after the current thread checked the state but before the
+                // end() method called on this thread was able to change the
+                // state. Just ignore this exception and continue.
+            }
         }
         return false;
     }
 
+    /**
+     * Returns whether the coordination has ended in failure.
+     * <p>
+     * The return value of <code>false</code> may be a transient situation if
+     * the coordination is in the process of terminating due to a failure.
+     */
     public boolean isFailed() {
         return state == State.FAILED;
     }
 
     public void addTimeout(long timeOutInMs) {
+        if (this.timeOutInMs > 0) {
+            // already set, ignore
+        }
+
         this.timeOutInMs = timeOutInMs;
+        scheduleTimeout(timeOutInMs);
     }
 
+    /**
+     * Adds the participant to the end of the list of participants of this
+     * coordination.
+     * <p>
+     * This method blocks if the given participant is currently participating in
+     * another coordination.
+     * <p>
+     * Participants can only be added to a coordination if it is active.
+     *
+     * @throws org.apache.felix.service.coordination.CoordinationException if
+     *             the participant cannot currently participate in this
+     *             coordination
+     */
     public boolean participate(Participant p) {
-        if (state == State.ACTIVE) {
-            if (!participants.contains(p)) {
-                participants.add(p);
+
+        // ensure participant only pariticipates on a single coordination
+        // this blocks until the participant can participate or until
+        // a timeout occurrs (or a deadlock is detected)
+        mgr.lockParticipant(p, this);
+
+        // synchronize access to the state to prevent it from being changed
+        // while adding the participant
+        synchronized (this) {
+            if (state == State.ACTIVE) {
+                if (!participants.contains(p)) {
+                    participants.add(p);
+                }
+                return true;
             }
-            return true;
+            return false;
         }
-        return false;
     }
 
     public Collection<Participant> getParticipants() {
-        return new ArrayList<Participant>(participants);
+        // synchronize access to the state to prevent it from being changed
+        // while we create a copy of the participant list
+        synchronized (this) {
+            if (state == State.ACTIVE) {
+                return new ArrayList<Participant>(participants);
+            }
+        }
+
+        return Collections.<Participant> emptyList();
     }
 
     public Map<Class<?>, ?> getVariables() {
         return variables;
     }
 
+    /**
+     * If this coordination is still active, this method initiates the
+     * termination of the coordination by setting the state to
+     * {@value State#TERMINATING}, unregistering from the
+     * {@link CoordinationMgr} and ensuring there is no timeout task active any
+     * longer to timeout this coordination.
+     *
+     * @return <code>true</code> If the coordination was active and termination
+     *         can continue. If <code>false</code> is returned, the coordination
+     *         must be considered terminated (or terminating) in the current
+     *         thread and no further termination processing must take place.
+     */
+    private synchronized boolean startTermination() {
+        if (state == State.ACTIVE) {
+            state = State.TERMINATING;
+            mgr.unregister(this);
+            scheduleTimeout(-1);
+            return true;
+        }
+
+        // this coordination is not active any longer, nothing to do
+        return false;
+    }
+
+    /**
+     * Internal implemenation of successful termination of the coordination.
+     * <p>
+     * This method must only be called after the {@link #state} field has been
+     * set to {@link State#TERMINATING} and only be the method successfully
+     * setting this state.
+     *
+     * @return OK or PARTIALLY_ENDED depending on whether all participants
+     *         succeeded or some of them failed ending the coordination.
+     */
+    private int endInternal() {
+        int reason = OK;
+        for (Participant part : participants) {
+            try {
+                part.ended(this);
+            } catch (Exception e) {
+                // TODO: log
+                reason = PARTIALLY_ENDED;
+            }
+
+            // release the participant for other coordinations
+            mgr.releaseParticipant(part);
+        }
+        state = State.TERMINATED;
+        return reason;
+    }
+
+    /**
+     * Internal implemenation of coordination failure.
+     * <p>
+     * This method must only be called after the {@link #state} field has been
+     * set to {@link State#TERMINATING} and only be the method successfully
+     * setting this state.
+     */
+    private void failInternal() {
+        // consider failure reason (if not null)
+        for (Participant part : participants) {
+            try {
+                part.failed(this);
+            } catch (Exception e) {
+                // TODO: log
+            }
+
+            // release the participant for other coordinations
+            mgr.releaseParticipant(part);
+        }
+        state = State.FAILED;
+    }
+
+    /**
+     * Helper method for timeout scheduling. If a timer is currently scheduled
+     * it is canceled. If the new timeout value is a positive value a new timer
+     * is scheduled to fire of so many milliseconds from now.
+     *
+     * @param timeout The new timeout value
+     */
+    private void scheduleTimeout(final long timeout) {
+        if (timeoutTask != null) {
+            mgr.schedule(timeoutTask, -1);
+            timeoutTask = null;
+        }
+
+        if (timeout > 0) {
+            timeoutTask = new TimerTask() {
+                @Override
+                public void run() {
+                    CoordinationImpl.this.timeout();
+                }
+            };
+
+            mgr.schedule(timeoutTask, timeout);
+        }
+    }
 }

Modified: felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java
URL: http://svn.apache.org/viewvc/felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java?rev=1022014&r1=1022013&r2=1022014&view=diff
==============================================================================
--- felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java (original)
+++ felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinationMgr.java Wed Oct 13 07:39:32 2010
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
@@ -38,6 +40,18 @@ import org.apache.felix.service.coordina
 import org.apache.felix.service.coordination.CoordinationException;
 import org.apache.felix.service.coordination.Participant;
 
+/**
+ * The <code>CoordinationMgr</code> is the actual backend manager of all
+ * Coordinations created by the Coordinator implementation. The methods in this
+ * class fall into three categories:
+ * <ul>
+ * <li>Actual implementations of the Coordinator interface on behalf of the
+ * per-bundle Coordinator service instances</li>
+ * <li>Implementation of the CoordinatorMBean interface allowing JMX management
+ * of the coordinations</li>
+ * <li>Management support to timeout and cleanup coordinations</li>
+ * </ul>
+ */
 @SuppressWarnings("deprecation")
 public class CoordinationMgr implements CoordinatorMBean {
 
@@ -47,9 +61,30 @@ public class CoordinationMgr implements 
 
     private final Map<Long, CoordinationImpl> coordinations;
 
+    private final Map<Participant, CoordinationImpl> participants;
+
+    private final Timer coordinationTimer;
+
+    /**
+     * Default coordination timeout. Currently hard coded to be 30s (the
+     * specified minimum timeout). Should be made configurable, but not less
+     * than 30s.
+     */
+    private long defaultTimeOut = 30 * 1000L;
+
+    /**
+     * Wait at most 60 seconds for participant to be eligible for participation
+     * in a coordination.
+     *
+     * @see #singularizeParticipant(Participant, CoordinationImpl)
+     */
+    private long participationTimeOut = 60 * 1000L;
+
     CoordinationMgr() {
         ctr = new AtomicLong(-1);
         coordinations = new HashMap<Long, CoordinationImpl>();
+        participants = new HashMap<Participant, CoordinationImpl>();
+        coordinationTimer = new Timer("Coordination Timer", true);
     }
 
     void unregister(final CoordinationImpl c) {
@@ -61,25 +96,81 @@ public class CoordinationMgr implements 
     }
 
     void cleanUp() {
+        // terminate coordination timeout timer
+        coordinationTimer.purge();
+        coordinationTimer.cancel();
+
+        // terminate all active coordinations
         final Exception reason = new Exception();
         for (Coordination c : coordinations.values()) {
             c.fail(reason);
         }
         coordinations.clear();
+
+        // release all participants
+        participants.clear();
+    }
+
+    void configure(final long coordinationTimeout,
+            final long participationTimeout) {
+        this.defaultTimeOut = coordinationTimeout;
+        this.participationTimeOut = participationTimeout;
+    }
+
+    void schedule(final TimerTask task, final long delay) {
+        if (delay < 0) {
+            task.cancel();
+        } else {
+            coordinationTimer.schedule(task, delay);
+        }
     }
 
-    public Coordination create(String name) {
+    void lockParticipant(final Participant p, final CoordinationImpl c) {
+        synchronized (participants) {
+            // wait for participant to be released
+            long cutOff = System.currentTimeMillis() + participationTimeOut;
+            while (participants.containsKey(p)) {
+                try {
+                    participants.wait(participationTimeOut / 500);
+                } catch (InterruptedException ie) {
+                    // don't worry, just keep on waiting
+                }
+
+                // timeout waiting for participation
+                if (System.currentTimeMillis() > cutOff) {
+                    throw new CoordinationException(
+                        "Timed out waiting to join coordinaton", c.getName(),
+                        CoordinationException.TIMEOUT);
+                }
+            }
+
+            // lock participant into coordination
+            participants.put(p, c);
+        }
+    }
+
+    void releaseParticipant(final Participant p) {
+        synchronized (participants) {
+            participants.remove(p);
+            participants.notifyAll();
+        }
+    }
+
+    // ---------- Coordinator back end implementation
+
+    Coordination create(String name) {
         long id = ctr.incrementAndGet();
-        CoordinationImpl c = new CoordinationImpl(id, name);
+        CoordinationImpl c = new CoordinationImpl(this, id, name,
+            defaultTimeOut);
         coordinations.put(id, c);
         return c;
     }
 
-    public Coordination begin(String name) {
+    Coordination begin(String name) {
         return push(create(name));
     }
 
-    public Coordination push(Coordination c) {
+    Coordination push(Coordination c) {
         Stack<Coordination> stack = threadStacks.get();
         if (stack == null) {
             stack = new Stack<Coordination>();
@@ -88,7 +179,7 @@ public class CoordinationMgr implements 
         return stack.push(c);
     }
 
-    public Coordination pop() {
+    Coordination pop() {
         Stack<Coordination> stack = threadStacks.get();
         if (stack != null && !stack.isEmpty()) {
             return stack.pop();
@@ -96,7 +187,7 @@ public class CoordinationMgr implements 
         return null;
     }
 
-    public Coordination getCurrentCoordination() {
+    Coordination getCurrentCoordination() {
         Stack<Coordination> stack = threadStacks.get();
         if (stack != null && !stack.isEmpty()) {
             return stack.peek();
@@ -104,16 +195,16 @@ public class CoordinationMgr implements 
         return null;
     }
 
-    public boolean alwaysFail(Throwable reason) {
+    boolean alwaysFail(Throwable reason) {
         CoordinationImpl current = (CoordinationImpl) getCurrentCoordination();
         if (current != null) {
-            current.mustFail();
+            current.mustFail(reason);
             return true;
         }
         return false;
     }
 
-    public Collection<Coordination> getCoordinations() {
+    Collection<Coordination> getCoordinations() {
         ArrayList<Coordination> result = new ArrayList<Coordination>();
         Stack<Coordination> stack = threadStacks.get();
         if (stack != null) {
@@ -122,9 +213,7 @@ public class CoordinationMgr implements 
         return result;
     }
 
-    public boolean participate(Participant participant)
-            throws CoordinationException {
-        // TODO: check for multi-pariticipation and block
+    boolean participate(Participant participant) throws CoordinationException {
         Coordination current = getCurrentCoordination();
         if (current != null) {
             current.participate(participant);
@@ -133,8 +222,7 @@ public class CoordinationMgr implements 
         return false;
     }
 
-    public Coordination participateOrBegin(Participant ifActive) {
-        // TODO: check for multi-pariticipation and block
+    Coordination participateOrBegin(Participant ifActive) {
         Coordination current = getCurrentCoordination();
         if (current == null) {
             current = begin("implicit");

Modified: felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java
URL: http://svn.apache.org/viewvc/felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java?rev=1022014&r1=1022013&r2=1022014&view=diff
==============================================================================
--- felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java (original)
+++ felix/trunk/coordinator/src/main/java/org/apache/felix/coordination/impl/CoordinatorImpl.java Wed Oct 13 07:39:32 2010
@@ -42,11 +42,6 @@ public class CoordinatorImpl implements 
         return mgr.create(name);
     }
 
-    void unregister(final CoordinationImpl c) {
-        // TODO: check permission
-        mgr.unregister(c);
-    }
-
     public Coordination begin(String name) {
         // TODO: check permission
         return mgr.begin(name);