You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@felix.apache.org by cz...@apache.org on 2012/05/23 10:28:44 UTC

svn commit: r1341782 - in /felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl: handler/EventAdminImpl.java handler/EventHandlerProxy.java tasks/AsyncDeliverTasks.java tasks/SyncDeliverTasks.java

Author: cziegeler
Date: Wed May 23 08:28:43 2012
New Revision: 1341782

URL: http://svn.apache.org/viewvc?rev=1341782&view=rev
Log:
FELIX-3518 : Implement EventAdmin 1.3 (WiP)

Modified:
    felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java
    felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
    felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
    felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java

Modified: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java
URL: http://svn.apache.org/viewvc/felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java?rev=1341782&r1=1341781&r2=1341782&view=diff
==============================================================================
--- felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java (original)
+++ felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventAdminImpl.java Wed May 23 08:28:43 2012
@@ -18,7 +18,9 @@
  */
 package org.apache.felix.eventadmin.impl.handler;
 
-import org.apache.felix.eventadmin.impl.tasks.*;
+import org.apache.felix.eventadmin.impl.tasks.AsyncDeliverTasks;
+import org.apache.felix.eventadmin.impl.tasks.DefaultThreadPool;
+import org.apache.felix.eventadmin.impl.tasks.SyncDeliverTasks;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -54,12 +56,12 @@ public class EventAdminImpl implements E
      * @param asyncPool The asynchronous thread pool
      */
     public EventAdminImpl(
-            final BundleContext bundleContext,
-            final DefaultThreadPool syncPool,
-            final DefaultThreadPool asyncPool,
-            final int timeout,
-            final String[] ignoreTimeout,
-            final boolean requireTopic)
+                    final BundleContext bundleContext,
+                    final DefaultThreadPool syncPool,
+                    final DefaultThreadPool asyncPool,
+                    final int timeout,
+                    final String[] ignoreTimeout,
+                    final boolean requireTopic)
     {
         checkNull(syncPool, "syncPool");
         checkNull(asyncPool, "asyncPool");
@@ -109,7 +111,7 @@ public class EventAdminImpl implements E
      */
     public void sendEvent(final Event event)
     {
-        m_sendManager.execute(this.getTracker().getHandlers(event), event);
+        m_sendManager.execute(this.getTracker().getHandlers(event), event, false);
     }
 
     /**
@@ -125,8 +127,8 @@ public class EventAdminImpl implements E
      * Update the event admin with new configuration.
      */
     public void update(final int timeout,
-            final String[] ignoreTimeout,
-            final boolean requireTopic)
+                    final String[] ignoreTimeout,
+                    final boolean requireTopic)
     {
         this.tracker.close();
         this.tracker.update(ignoreTimeout, requireTopic);

Modified: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
URL: http://svn.apache.org/viewvc/felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java?rev=1341782&r1=1341781&r2=1341782&view=diff
==============================================================================
--- felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java (original)
+++ felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java Wed May 23 08:28:43 2012
@@ -18,10 +18,17 @@
  */
 package org.apache.felix.eventadmin.impl.handler;
 
+import java.util.Collection;
+
 import org.apache.felix.eventadmin.impl.security.PermissionsUtil;
 import org.apache.felix.eventadmin.impl.util.LogWrapper;
-import org.osgi.framework.*;
-import org.osgi.service.event.*;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 
 /**
  * This is a proxy for event handlers. It gets the real event handler
@@ -55,28 +62,31 @@ public class EventHandlerProxy {
     /** Use timeout. */
     private boolean useTimeout;
 
-	/**
-	 * Create an EventHandlerProxy.
+    /** Deliver async ordered. */
+    private boolean asyncOrderedDelivery;
+
+    /**
+     * Create an EventHandlerProxy.
      *
      * @param context The handler context
-	 * @param reference Reference to the EventHandler
-	 */
-	public EventHandlerProxy(final EventHandlerTracker.HandlerContext context,
-	        final ServiceReference reference)
-	{
-	    this.handlerContext = context;
-		this.reference = reference;
-	}
-
-	/**
-	 * Update the state with current properties from the service
-	 * @return <code>true</code> if the handler configuration is valid.
-	 */
-	public boolean update()
-	{
-	    this.blacklisted = false;
-		boolean valid = true;
-		// First check, topic
+     * @param reference Reference to the EventHandler
+     */
+    public EventHandlerProxy(final EventHandlerTracker.HandlerContext context,
+                    final ServiceReference reference)
+    {
+        this.handlerContext = context;
+        this.reference = reference;
+    }
+
+    /**
+     * Update the state with current properties from the service
+     * @return <code>true</code> if the handler configuration is valid.
+     */
+    public boolean update()
+    {
+        this.blacklisted = false;
+        boolean valid = true;
+        // First check, topic
         final Object topicObj = reference.getProperty(EventConstants.EVENT_TOPIC);
         if (topicObj instanceof String)
         {
@@ -126,11 +136,11 @@ public class EventHandlerProxy {
                 reason = "Neither of type String nor String[] : " + topicObj.getClass().getName();
             }
             LogWrapper.getLogger().log(
-                    this.reference,
-                    LogWrapper.LOG_WARNING,
-                    "Invalid EVENT_TOPICS : " + reason + " - Ignoring ServiceReference ["
-                        + this.reference + " | Bundle("
-                        + this.reference.getBundle() + ")]");
+                            this.reference,
+                            LogWrapper.LOG_WARNING,
+                            "Invalid EVENT_TOPICS : " + reason + " - Ignoring ServiceReference ["
+                                            + this.reference + " | Bundle("
+                                            + this.reference.getBundle() + ")]");
             this.topics = null;
             valid = false;
         }
@@ -149,39 +159,90 @@ public class EventHandlerProxy {
                 {
                     valid = false;
                     LogWrapper.getLogger().log(
-                            this.reference,
-                            LogWrapper.LOG_WARNING,
-                            "Invalid EVENT_FILTER - Ignoring ServiceReference ["
-                                + this.reference + " | Bundle("
-                                + this.reference.getBundle() + ")]", e);
+                                    this.reference,
+                                    LogWrapper.LOG_WARNING,
+                                    "Invalid EVENT_FILTER - Ignoring ServiceReference ["
+                                                    + this.reference + " | Bundle("
+                                                    + this.reference.getBundle() + ")]", e);
                 }
             }
             else if ( filterObj != null )
             {
                 valid = false;
                 LogWrapper.getLogger().log(
-                        this.reference,
-                        LogWrapper.LOG_WARNING,
-                        "Invalid EVENT_FILTER - Ignoring ServiceReference ["
-                            + this.reference + " | Bundle("
-                            + this.reference.getBundle() + ")]");
+                                this.reference,
+                                LogWrapper.LOG_WARNING,
+                                "Invalid EVENT_FILTER - Ignoring ServiceReference ["
+                                                + this.reference + " | Bundle("
+                                                + this.reference.getBundle() + ")]");
             }
         }
         this.filter = handlerFilter;
 
+        // new in 1.3 - deliver
+        this.asyncOrderedDelivery = true;
+        Object delivery = reference.getProperty(EventConstants.EVENT_DELIVERY);
+        if ( delivery instanceof Collection )
+        {
+            delivery = ((Collection)delivery).toArray(new String[((Collection)delivery).size()]);
+        }
+        if ( delivery instanceof String )
+        {
+            this.asyncOrderedDelivery =  !(EventConstants.DELIVERY_ASYNC_UNORDERED.equals(delivery.toString()));
+        }
+        else if ( delivery instanceof String[] )
+        {
+            final String[] deliveryArray = (String[])delivery;
+            boolean foundOrdered = false, foundUnordered = false;
+            for(int i=0; i<deliveryArray.length; i++)
+            {
+                final String value = deliveryArray[i];
+                if ( EventConstants.DELIVERY_ASYNC_UNORDERED.equals(value) )
+                {
+                    foundUnordered = true;
+                }
+                else if ( EventConstants.DELIVERY_ASYNC_ORDERED.equals(value) )
+                {
+                    foundOrdered = true;
+                }
+                else
+                {
+                    LogWrapper.getLogger().log(
+                                    this.reference,
+                                    LogWrapper.LOG_WARNING,
+                                    "Invalid EVENT_DELIVERY - Ignoring invalid value for event delivery property " + value + " of ServiceReference ["
+                                                    + this.reference + " | Bundle("
+                                                    + this.reference.getBundle() + ")]");
+
+                }
+            }
+            if ( !foundOrdered && foundUnordered )
+            {
+                this.asyncOrderedDelivery = false;
+            }
+        }
+        else if ( delivery != null )
+        {
+            LogWrapper.getLogger().log(
+                            this.reference,
+                            LogWrapper.LOG_WARNING,
+                            "Invalid EVENT_DELIVERY - Ignoring event delivery property " + delivery + " of ServiceReference ["
+                                            + this.reference + " | Bundle("
+                                            + this.reference.getBundle() + ")]");
+        }
         // make sure to release the handler
         this.release();
 
         return valid;
-	}
+    }
 
-	/**
-	 * Dispose the proxy and release the handler
-	 */
-	public void dispose()
-	{
-	    this.release();
-	}
+    /**
+     * Dispose the proxy and release the handler
+     */
+    public void dispose()
+    {
+        this.release();
+    }
 
     /**
      * Get the event handler.
@@ -206,12 +267,12 @@ public class EventHandlerProxy {
     }
 
     /**
-	 * Release the handler
-	 */
-	private synchronized void release()
-	{
-		if ( this.handler != null )
-		{
+     * Release the handler
+     */
+    private synchronized void release()
+    {
+        if ( this.handler != null )
+        {
             try
             {
                 this.handlerContext.bundleContext.ungetService(this.reference);
@@ -221,17 +282,17 @@ public class EventHandlerProxy {
                 // event handler might be stopped - ignore
             }
             this.handler = null;
-		}
-	}
+        }
+    }
 
-	/**
-	 * Get the topics of this handler.
-	 * If this handler matches all topics <code>null</code> is returned
-	 */
-	public String[] getTopics()
-	{
-	    return this.topics;
-	}
+    /**
+     * Get the topics of this handler.
+     * If this handler matches all topics <code>null</code> is returned
+     */
+    public String[] getTopics()
+    {
+        return this.topics;
+    }
 
     /**
      * Check if this handler is allowed to receive the event
@@ -278,6 +339,14 @@ public class EventHandlerProxy {
     }
 
     /**
+     * Should async events be delivered in order?
+     */
+    public boolean isAsyncOrderedDelivery()
+    {
+        return this.asyncOrderedDelivery;
+    }
+
+    /**
      * Check the timeout configuration for this handler.
      */
     private void checkTimeout(final String className)
@@ -300,43 +369,43 @@ public class EventHandlerProxy {
     }
 
     /**
-	 * Send the event.
-	 */
-	public void sendEvent(final Event event)
-	{
-		final EventHandler handlerService = this.obtain();
-		if (handlerService == null)
-		{
-			return;
-		}
-
-		try
-		{
-			handlerService.handleEvent(event);
-		}
-		catch (final Throwable e)
-		{
+     * Send the event.
+     */
+    public void sendEvent(final Event event)
+    {
+        final EventHandler handlerService = this.obtain();
+        if (handlerService == null)
+        {
+            return;
+        }
+
+        try
+        {
+            handlerService.handleEvent(event);
+        }
+        catch (final Throwable e)
+        {
             // The spec says that we must catch exceptions and log them:
             LogWrapper.getLogger().log(
-                this.reference,
-                LogWrapper.LOG_WARNING,
-                "Exception during event dispatch [" + event + " | "
-                    + this.reference + " | Bundle("
-                    + this.reference.getBundle() + ")]", e);
-		}
-	}
-
-	/**
-	 * Blacklist the handler.
-	 */
-	public void blackListHandler()
-	{
+                            this.reference,
+                            LogWrapper.LOG_WARNING,
+                            "Exception during event dispatch [" + event + " | "
+                                            + this.reference + " | Bundle("
+                                            + this.reference.getBundle() + ")]", e);
+        }
+    }
+
+    /**
+     * Blacklist the handler.
+     */
+    public void blackListHandler()
+    {
         LogWrapper.getLogger().log(
-                LogWrapper.LOG_WARNING,
-                "Blacklisting ServiceReference [" + this.reference + " | Bundle("
-                    + this.reference.getBundle() + ")] due to timeout!");
+                        LogWrapper.LOG_WARNING,
+                        "Blacklisting ServiceReference [" + this.reference + " | Bundle("
+                                        + this.reference.getBundle() + ")] due to timeout!");
         this.blacklisted = true;
         // we can free the handler now.
         this.release();
-	}
+    }
 }

Modified: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java
URL: http://svn.apache.org/viewvc/felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java?rev=1341782&r1=1341781&r2=1341782&view=diff
==============================================================================
--- felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java (original)
+++ felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/AsyncDeliverTasks.java Wed May 23 08:28:43 2012
@@ -18,8 +18,14 @@
  */
 package org.apache.felix.eventadmin.impl.tasks;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
+import org.apache.felix.eventadmin.impl.handler.EventHandlerProxy;
 import org.osgi.service.event.Event;
 
 /**
@@ -63,24 +69,42 @@ public class AsyncDeliverTasks
      */
     public void execute(final Collection tasks, final Event event)
     {
-        final Thread currentThread = Thread.currentThread();
-        TaskExecuter executer = null;
-        synchronized (m_running_threads )
+        final Iterator i = tasks.iterator();
+        boolean hasOrdered = false;
+        while ( i.hasNext() )
         {
-            TaskExecuter runningExecutor = (TaskExecuter)m_running_threads.get(currentThread);
-            if ( runningExecutor != null )
+            final EventHandlerProxy task = (EventHandlerProxy)i.next();
+            if ( !task.isAsyncOrderedDelivery() )
             {
-                runningExecutor.add(tasks, event);
+                // do somethimg
             }
             else
             {
-                executer = new TaskExecuter( tasks, event, currentThread );
-                m_running_threads.put(currentThread, executer);
+                hasOrdered = true;
             }
+
         }
-        if ( executer != null )
+        if ( hasOrdered )
         {
-            m_pool.executeTask(executer);
+            final Thread currentThread = Thread.currentThread();
+            TaskExecuter executer = null;
+            synchronized (m_running_threads )
+            {
+                final TaskExecuter runningExecutor = (TaskExecuter)m_running_threads.get(currentThread);
+                if ( runningExecutor != null )
+                {
+                    runningExecutor.add(tasks, event);
+                }
+                else
+                {
+                    executer = new TaskExecuter( tasks, event, currentThread );
+                    m_running_threads.put(currentThread, executer);
+                }
+            }
+            if ( executer != null )
+            {
+                m_pool.executeTask(executer);
+            }
         }
     }
 
@@ -106,7 +130,7 @@ public class AsyncDeliverTasks
                 {
                     tasks = (Object[]) m_tasks.remove(0);
                 }
-                m_deliver_task.execute((Collection)tasks[0], (Event)tasks[1]);
+                m_deliver_task.execute((Collection)tasks[0], (Event)tasks[1], true);
                 synchronized ( m_running_threads )
                 {
                     running = m_tasks.size() > 0;

Modified: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
URL: http://svn.apache.org/viewvc/felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java?rev=1341782&r1=1341781&r2=1341782&view=diff
==============================================================================
--- felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java (original)
+++ felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java Wed May 23 08:28:43 2012
@@ -101,7 +101,7 @@ public class SyncDeliverTasks
      * @param tasks The event handler dispatch tasks to execute
      *
      */
-    public void execute(final Collection tasks, final Event event)
+    public void execute(final Collection tasks, final Event event, final boolean filterAsyncUnordered)
     {
         final Thread sleepingThread = Thread.currentThread();
         final SyncThread syncThread = sleepingThread instanceof SyncThread ? (SyncThread)sleepingThread : null;
@@ -110,62 +110,64 @@ public class SyncDeliverTasks
         while ( i.hasNext() )
         {
             final EventHandlerProxy task = (EventHandlerProxy)i.next();
-
-            if ( !useTimeout(task) )
-            {
-                // no timeout, we can directly execute
-                task.sendEvent(event);
-            }
-            else if ( syncThread != null )
+            if ( !filterAsyncUnordered || task.isAsyncOrderedDelivery() )
             {
-                // if this is a cascaded event, we directly use this thread
-                // otherwise we could end up in a starvation
-                final long startTime = System.currentTimeMillis();
-                task.sendEvent(event);
-                if ( System.currentTimeMillis() - startTime > this.timeout )
+                if ( !useTimeout(task) )
                 {
-                    task.blackListHandler();
+                    // no timeout, we can directly execute
+                    task.sendEvent(event);
                 }
-            }
-            else
-            {
-                final Rendezvous startBarrier = new Rendezvous();
-                final Rendezvous timerBarrier = new Rendezvous();
-                this.pool.executeTask(new Runnable()
+                else if ( syncThread != null )
                 {
-                    public void run()
+                    // if this is a cascaded event, we directly use this thread
+                    // otherwise we could end up in a starvation
+                    final long startTime = System.currentTimeMillis();
+                    task.sendEvent(event);
+                    if ( System.currentTimeMillis() - startTime > this.timeout )
                     {
-                        try
-                        {
-                            // notify the outer thread to start the timer
-                            startBarrier.waitForRendezvous();
-                            // execute the task
-                            task.sendEvent(event);
-                            // stop the timer
-                            timerBarrier.waitForRendezvous();
-                        }
-                        catch (final IllegalStateException ise)
+                        task.blackListHandler();
+                    }
+                }
+                else
+                {
+                    final Rendezvous startBarrier = new Rendezvous();
+                    final Rendezvous timerBarrier = new Rendezvous();
+                    this.pool.executeTask(new Runnable()
+                    {
+                        public void run()
                         {
-                            // this can happen on shutdown, so we ignore it
+                            try
+                            {
+                                // notify the outer thread to start the timer
+                                startBarrier.waitForRendezvous();
+                                // execute the task
+                                task.sendEvent(event);
+                                // stop the timer
+                                timerBarrier.waitForRendezvous();
+                            }
+                            catch (final IllegalStateException ise)
+                            {
+                                // this can happen on shutdown, so we ignore it
+                            }
                         }
+                    });
+                    // we wait for the inner thread to start
+                    startBarrier.waitForRendezvous();
+
+                    // timeout handling
+                    // we sleep for the sleep time
+                    // if someone wakes us up it's the finished inner task
+                    try
+                    {
+                        timerBarrier.waitAttemptForRendezvous(this.timeout);
+                    }
+                    catch (final TimeoutException ie)
+                    {
+                        // if we timed out, we have to blacklist the handler
+                        task.blackListHandler();
                     }
-                });
-                // we wait for the inner thread to start
-                startBarrier.waitForRendezvous();
 
-                // timeout handling
-                // we sleep for the sleep time
-                // if someone wakes us up it's the finished inner task
-                try
-                {
-                    timerBarrier.waitAttemptForRendezvous(this.timeout);
-                }
-                catch (final TimeoutException ie)
-                {
-                    // if we timed out, we have to blacklist the handler
-                    task.blackListHandler();
                 }
-
             }
         }
     }