You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@felix.apache.org by cz...@apache.org on 2014/09/29 09:55:52 UTC

svn commit: r1628132 - in /felix/trunk/eventadmin/impl: ./ src/main/java/org/apache/felix/eventadmin/impl/handler/ src/main/java/org/apache/felix/eventadmin/impl/tasks/

Author: cziegeler
Date: Mon Sep 29 07:55:52 2014
New Revision: 1628132

URL: http://svn.apache.org/r1628132
Log:
FELIX-4638 - Less locking on event handler timing. Apply patch from Bob Paulin

Added:
    felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java   (with props)
    felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java   (with props)
Removed:
    felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/Rendezvous.java
Modified:
    felix/trunk/eventadmin/impl/changelog.txt
    felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
    felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java

Modified: felix/trunk/eventadmin/impl/changelog.txt
URL: http://svn.apache.org/viewvc/felix/trunk/eventadmin/impl/changelog.txt?rev=1628132&r1=1628131&r2=1628132&view=diff
==============================================================================
--- felix/trunk/eventadmin/impl/changelog.txt (original)
+++ felix/trunk/eventadmin/impl/changelog.txt Mon Sep 29 07:55:52 2014
@@ -2,6 +2,7 @@ Changes from 1.4.2 to 1.4.4
 ---------------------------
 ** Improvement
     * [FELIX-4623] - Add test for thread based ordering
+    * [FELIX-4638] - Less locking on event handler timing
 
 
 Changes from 1.4.0 to 1.4.2

Modified: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java
URL: http://svn.apache.org/viewvc/felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java?rev=1628132&r1=1628131&r2=1628132&view=diff
==============================================================================
--- felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java (original)
+++ felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/handler/EventHandlerProxy.java Mon Sep 29 07:55:52 2014
@@ -431,12 +431,15 @@ public class EventHandlerProxy {
      */
     public void blackListHandler()
     {
-        LogWrapper.getLogger().log(
-                        LogWrapper.LOG_WARNING,
-                        "Blacklisting ServiceReference [" + this.reference + " | Bundle("
-                                        + this.reference.getBundle() + ")] due to timeout!");
-        this.blacklisted = true;
-        // we can free the handler now.
-        this.release();
+    	if(!this.blacklisted)
+    	{
+	        LogWrapper.getLogger().log(
+	                        LogWrapper.LOG_WARNING,
+	                        "Blacklisting ServiceReference [" + this.reference + " | Bundle("
+	                                        + this.reference.getBundle() + ")] due to timeout!");
+	        this.blacklisted = true;
+	        // we can free the handler now.
+	        this.release();
+    	}
     }
 }

Added: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java
URL: http://svn.apache.org/viewvc/felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java?rev=1628132&view=auto
==============================================================================
--- felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java (added)
+++ felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java Mon Sep 29 07:55:52 2014
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.eventadmin.impl.tasks;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.eventadmin.impl.util.LogWrapper;
+
+/**
+ *
+ * A latch that checks handlers for blacklisting on an interval.
+ *
+ */
+public class BlacklistLatch {
+
+	private final Semaphore internalSemaphore;
+
+	private final int count;
+
+	private final long timeout;
+
+	private final List<HandlerTask> handlerTasks;
+
+	/**
+	 * @param count Number of handlers that must call countdown
+	 * @param timeout Timeout in Milliseconds to check for blacklisting handlers
+	 */
+	public BlacklistLatch(final int count, final long timeout)
+	{
+		this.handlerTasks = new ArrayList<HandlerTask>(count);
+		this.count = count;
+		this.timeout = timeout;
+		internalSemaphore = new Semaphore(count);
+		internalSemaphore.drainPermits();
+	}
+
+	/**
+	 *
+	 * Count down the number of handlers blocking event completion.
+	 *
+	 */
+	public void countDown()
+	{
+		internalSemaphore.release();
+	}
+
+	/**
+	 *
+	 * Adds a handler task to the timeout based blackout checking.
+	 *
+	 * @param task
+	 */
+	public void addToBlacklistCheck(final HandlerTask task)
+	{
+		this.handlerTasks.add(task);
+	}
+
+	/**
+	 *
+	 * Causes current thread to wait until each handler has called countDown.
+	 * Checks on timeout interval to determine if a handler needs blacklisting.
+	 *
+	 */
+	public void awaitAndBlacklistCheck()
+	{
+		try
+        {
+        	while(!internalSemaphore.tryAcquire(this.count, this.timeout, TimeUnit.MILLISECONDS))
+            {
+            	final Iterator<HandlerTask> handlerTaskIt = handlerTasks.iterator();
+            	while(handlerTaskIt.hasNext())
+            	{
+            		HandlerTask currentTask = handlerTaskIt.next();
+            		currentTask.checkForBlacklist();
+            	}
+            }
+        }
+        catch (final InterruptedException e)
+        {
+        	LogWrapper.getLogger().log(
+                    LogWrapper.LOG_WARNING,
+                    "Event Task Processing Interrupted. Events may not be recieved in proper order.");
+        }
+	}
+
+
+}

Propchange: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/BlacklistLatch.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
URL: http://svn.apache.org/viewvc/felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java?rev=1628132&view=auto
==============================================================================
--- felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java (added)
+++ felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java Mon Sep 29 07:55:52 2014
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.eventadmin.impl.tasks;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+
+import org.apache.felix.eventadmin.impl.handler.EventHandlerProxy;
+import org.osgi.service.event.Event;
+
+/**
+ * A task that processes an event handler
+ *
+ */
+public class HandlerTask implements Runnable
+{
+	private final EventHandlerProxy task;
+
+	private final Event event;
+
+	private final long timeout;
+
+	private final BlacklistLatch handlerLatch;
+
+	private volatile long threadId;
+
+	private volatile long startTime;
+
+	private volatile long endTime;
+
+	/**
+	 *
+	 *
+	 * @param task Proxy to the event handler
+	 * @param event The event to send to the handler
+	 * @param timeout Timeout for handler blacklisting
+	 * @param handlerLatch The latch used to ensure events fire in proper order
+	 */
+	public HandlerTask(final EventHandlerProxy task, final Event event, final long timeout, final BlacklistLatch handlerLatch)
+	{
+		this.task = task;
+		this.event = event;
+		this.timeout = timeout;
+		this.handlerLatch = handlerLatch;
+		this.threadId = -1l;
+		this.startTime = -1l;
+		this.endTime = -1l;
+	}
+
+	/**
+	 *
+	 * Perform timing based on thread CPU time with clock time fall back.
+	 *
+	 * @return
+	 */
+	public long getTimeInMillis()
+    {
+        ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+        return bean.isThreadCpuTimeEnabled() ?
+            bean.getThreadCpuTime(threadId)/1000000 : System.currentTimeMillis();
+    }
+
+	/**
+	 * Run Hander Event
+	 */
+    @Override
+    public void run()
+    {
+        try
+        {
+        	threadId = Thread.currentThread().getId();
+            startTime = getTimeInMillis();
+            // execute the task
+            task.sendEvent(event);
+            endTime = getTimeInMillis();
+            checkForBlacklist();
+        }
+        finally
+        {
+        	handlerLatch.countDown();
+        }
+    }
+
+    public void runWithoutBlacklistTiming()
+    {
+    	task.sendEvent(event);
+    	handlerLatch.countDown();
+    }
+
+    /**
+     * This method defines if a timeout handling should be used for the
+     * task.
+     */
+    public boolean useTimeout()
+    {
+        // we only check the proxy if a timeout is configured
+        if ( this.timeout > 0)
+        {
+            return task.useTimeout();
+        }
+        return false;
+    }
+
+    /**
+     * Check to see if we need to blacklist this handler
+     *
+     */
+    public void checkForBlacklist()
+    {
+    	if(useTimeout() && getTaskTime() > this.timeout)
+		{
+			task.blackListHandler();
+		}
+    }
+
+    /**
+     *
+     * Determine the amount of time spent running this task
+     *
+     * @return
+     */
+    public long getTaskTime()
+    {
+    	if(threadId < 0l || startTime < 0l)
+    	{
+    		return 0l;
+    	}
+    	else if(endTime < 0l)
+    	{
+    		return getTimeInMillis() - startTime;
+    	}
+    	return endTime - startTime;
+
+    }
+
+}

Propchange: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/HandlerTask.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java
URL: http://svn.apache.org/viewvc/felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java?rev=1628132&r1=1628131&r2=1628132&view=diff
==============================================================================
--- felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java (original)
+++ felix/trunk/eventadmin/impl/src/main/java/org/apache/felix/eventadmin/impl/tasks/SyncDeliverTasks.java Mon Sep 29 07:55:52 2014
@@ -20,7 +20,6 @@ package org.apache.felix.eventadmin.impl
 
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.felix.eventadmin.impl.handler.EventHandlerProxy;
 import org.osgi.service.event.Event;
@@ -80,21 +79,6 @@ public class SyncDeliverTasks
     }
 
     /**
-     * This method defines if a timeout handling should be used for the
-     * task.
-     * @param tasks The event handler dispatch tasks to execute
-     */
-    private boolean useTimeout(final EventHandlerProxy proxy)
-    {
-        // we only check the proxy if a timeout is configured
-        if ( this.timeout > 0)
-        {
-            return proxy.useTimeout();
-        }
-        return false;
-    }
-
-    /**
      * This blocks an unrelated thread used to send a synchronous event until the
      * event is send (or a timeout occurs).
      *
@@ -107,69 +91,34 @@ public class SyncDeliverTasks
         final SyncThread syncThread = sleepingThread instanceof SyncThread ? (SyncThread)sleepingThread : null;
 
         final Iterator<EventHandlerProxy> i = tasks.iterator();
+        final BlacklistLatch handlerLatch = new BlacklistLatch(tasks.size(), this.timeout/2);
+        
         while ( i.hasNext() )
         {
             final EventHandlerProxy task = i.next();
+            HandlerTask handlerTask = new HandlerTask(task, event, this.timeout, handlerLatch);
 //            if ( !filterAsyncUnordered || task.isAsyncOrderedDelivery() )
 //            {
-                if ( !useTimeout(task) )
+                if( !handlerTask.useTimeout() )
                 {
-                    // no timeout, we can directly execute
-                    task.sendEvent(event);
+                	handlerTask.runWithoutBlacklistTiming();
                 }
-                else if ( syncThread != null )
+            	else if ( syncThread != null  )
                 {
                     // if this is a cascaded event, we directly use this thread
                     // otherwise we could end up in a starvation
-                    final long startTime = System.currentTimeMillis();
-                    task.sendEvent(event);
-                    if ( System.currentTimeMillis() - startTime > this.timeout )
-                    {
-                        task.blackListHandler();
-                    }
+                    handlerTask.run();
                 }
                 else
                 {
-                    final Rendezvous startBarrier = new Rendezvous();
-                    final Rendezvous timerBarrier = new Rendezvous();
-                    this.pool.executeTask(new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            try
-                            {
-                                // notify the outer thread to start the timer
-                                startBarrier.waitForRendezvous();
-                                // execute the task
-                                task.sendEvent(event);
-                                // stop the timer
-                                timerBarrier.waitForRendezvous();
-                            }
-                            catch (final IllegalStateException ise)
-                            {
-                                // this can happen on shutdown, so we ignore it
-                            }
-                        }
-                    });
-                    // we wait for the inner thread to start
-                    startBarrier.waitForRendezvous();
-
-                    // timeout handling
-                    // we sleep for the sleep time
-                    // if someone wakes us up it's the finished inner task
-                    try
-                    {
-                        timerBarrier.waitAttemptForRendezvous(this.timeout);
-                    }
-                    catch (final TimeoutException ie)
-                    {
-                        // if we timed out, we have to blacklist the handler
-                        task.blackListHandler();
-                    }
-
+                	
+                	handlerLatch.addToBlacklistCheck(handlerTask);
+                    this.pool.executeTask(handlerTask);
                 }
+               
 //            }
         }
+        handlerLatch.awaitAndBlacklistCheck();
+        
     }
 }