You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@felix.apache.org by gu...@apache.org on 2013/10/01 17:35:30 UTC

svn commit: r1528120 - in /felix/trunk/ipojo/runtime/core/src: main/java/org/apache/felix/ipojo/extender/internal/queue/ main/java/org/apache/felix/ipojo/extender/internal/queue/pref/ main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforc...

Author: guillaume
Date: Tue Oct  1 15:35:30 2013
New Revision: 1528120

URL: http://svn.apache.org/r1528120
Log:
FELIX-4262 QueueServices should be observable

* Added QueueListener to observe from a third-party what's going on in the thread pool

Added:
    felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueService.java
    felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/QueueNotifier.java
    felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueListener.java
    felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueServiceTestCase.java
    felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/callable/ExceptionCallable.java
Modified:
    felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/ExecutorQueueService.java
    felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallable.java
    felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/SynchronousQueueService.java
    felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/PreferenceQueueService.java
    felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforce/ForwardingQueueService.java
    felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueService.java
    felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallableTestCase.java

Added: felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueService.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueService.java?rev=1528120&view=auto
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueService.java (added)
+++ felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueService.java Tue Oct  1 15:35:30 2013
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.internal.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.felix.ipojo.extender.internal.AbstractService;
+import org.apache.felix.ipojo.extender.queue.Callback;
+import org.apache.felix.ipojo.extender.queue.JobInfo;
+import org.apache.felix.ipojo.extender.queue.QueueListener;
+import org.apache.felix.ipojo.extender.queue.QueueService;
+import org.osgi.framework.BundleContext;
+
+/**
+ * User: guillaume
+ * Date: 01/10/13
+ * Time: 14:41
+ */
+public abstract class AbstractQueueService extends AbstractService implements QueueService, QueueNotifier {
+
+    /**
+     * Store QueueListeners.
+     */
+    protected final List<QueueListener> m_listeners = new ArrayList<QueueListener>();
+
+    /**
+     * Constructor.
+     *
+     * @param bundleContext the bundle context
+     * @param type          the specification
+     */
+    protected AbstractQueueService(final BundleContext bundleContext, final Class<?> type) {
+        super(bundleContext, type);
+    }
+
+    public void addQueueListener(final QueueListener listener) {
+        m_listeners.add(listener);
+    }
+
+    public void removeQueueListener(final QueueListener listener) {
+        m_listeners.remove(listener);
+    }
+
+    public void fireEnlistedJobInfo(JobInfo info) {
+        for (QueueListener listener : m_listeners) {
+            listener.enlisted(info);
+        }
+    }
+
+    public void fireStartedJobInfo(JobInfo info) {
+        for (QueueListener listener : m_listeners) {
+            listener.started(info);
+        }
+    }
+
+    public void fireExecutedJobInfo(JobInfo info, Object result) {
+        for (QueueListener listener : m_listeners) {
+            listener.executed(info, result);
+        }
+    }
+
+    public void fireFailedJobInfo(JobInfo info, Throwable throwable) {
+        for (QueueListener listener : m_listeners) {
+            listener.failed(info, throwable);
+        }
+    }
+
+}

Modified: felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/ExecutorQueueService.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/ExecutorQueueService.java?rev=1528120&r1=1528119&r2=1528120&view=diff
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/ExecutorQueueService.java (original)
+++ felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/ExecutorQueueService.java Tue Oct  1 15:35:30 2013
@@ -19,7 +19,6 @@
 
 package org.apache.felix.ipojo.extender.internal.queue;
 
-import org.apache.felix.ipojo.extender.internal.AbstractService;
 import org.apache.felix.ipojo.extender.internal.LifecycleQueueService;
 import org.apache.felix.ipojo.extender.queue.Callback;
 import org.apache.felix.ipojo.extender.queue.JobInfo;
@@ -36,7 +35,7 @@ import java.util.concurrent.*;
 /**
  * An asynchronous implementation of the queue service. This implementation relies on an executor service.
  */
-public class ExecutorQueueService extends AbstractService implements LifecycleQueueService, ManagedService {
+public class ExecutorQueueService extends AbstractQueueService implements LifecycleQueueService, ManagedService {
 
     /**
      * Property name used to configure this ThreadPool's size (usable as System Property or ConfigAdmin property).
@@ -183,7 +182,8 @@ public class ExecutorQueueService extend
      * @return the reference on the submitted job
      */
     public <T> Future<T> submit(Callable<T> callable, Callback<T> callback, String description) {
-        return m_executorService.submit(new JobInfoCallable<T>(m_statistic, callable, callback, description));
+        JobInfoCallable<T> task = new JobInfoCallable<T>(this, m_statistic, callable, callback, description);
+        return m_executorService.submit(task);
     }
 
     public <T> Future<T> submit(Callable<T> callable, String description) {

Modified: felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallable.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallable.java?rev=1528120&r1=1528119&r2=1528120&view=diff
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallable.java (original)
+++ felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallable.java Tue Oct  1 15:35:30 2013
@@ -31,6 +31,11 @@ import java.util.concurrent.Callable;
 public class JobInfoCallable<T> implements Callable<T>, JobInfo {
 
     /**
+     * Notifier helper for {@link org.apache.felix.ipojo.extender.queue.QueueListener}.
+     */
+    private final QueueNotifier m_queueNotifier;
+
+    /**
      * The statistic object.
      */
     private final Statistic m_statistic;
@@ -68,20 +73,26 @@ public class JobInfoCallable<T> implemen
     /**
      * Creates the job info callable.
      *
+     * @param queueNotifier notifier for QueueListeners
      * @param statistic   the statistics that will be populated
      * @param delegate    the real job
      * @param callback    the callback notified when the job is completed
      * @param description the job description
      */
-    public JobInfoCallable(Statistic statistic,
+    public JobInfoCallable(QueueNotifier queueNotifier,
+                           Statistic statistic,
                            Callable<T> delegate,
                            Callback<T> callback,
                            String description) {
+        m_queueNotifier = queueNotifier;
         m_statistic = statistic;
         m_delegate = delegate;
         m_callback = callback;
         m_description = description;
         m_statistic.getWaiters().add(this);
+
+        // Assume that we will be enlisted in the next few cycles
+        m_queueNotifier.fireEnlistedJobInfo(this);
     }
 
     /**
@@ -96,20 +107,29 @@ public class JobInfoCallable<T> implemen
         startTime = System.currentTimeMillis();
         m_statistic.getCurrentsCounter().incrementAndGet();
         T result = null;
+        Exception exception = null;
         try {
+            m_queueNotifier.fireStartedJobInfo(this);
             result = m_delegate.call();
             return result;
         } catch (Exception e) {
+            m_queueNotifier.fireFailedJobInfo(this, e);
             if (m_callback != null) {
                 m_callback.error(this, e);
             }
+            exception = e;
             throw e;
         } finally {
             m_statistic.getCurrentsCounter().decrementAndGet();
             m_statistic.getFinishedCounter().incrementAndGet();
             endTime = System.currentTimeMillis();
-            if (m_callback != null) {
-                m_callback.success(this, result);
+
+            // Only exec success callbacks when no error occurred
+            if (exception == null) {
+                m_queueNotifier.fireExecutedJobInfo(this, result);
+                if (m_callback != null) {
+                    m_callback.success(this, result);
+                }
             }
         }
     }

Added: felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/QueueNotifier.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/QueueNotifier.java?rev=1528120&view=auto
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/QueueNotifier.java (added)
+++ felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/QueueNotifier.java Tue Oct  1 15:35:30 2013
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.internal.queue;
+
+import org.apache.felix.ipojo.extender.queue.JobInfo;
+
+/**
+ * Internal interface to de-couple event producer and event listeners.
+ */
+public interface QueueNotifier {
+    void fireEnlistedJobInfo(JobInfo info);
+    void fireStartedJobInfo(JobInfo info);
+    void fireExecutedJobInfo(JobInfo info, Object result);
+    void fireFailedJobInfo(JobInfo info, Throwable throwable);
+}

Modified: felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/SynchronousQueueService.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/SynchronousQueueService.java?rev=1528120&r1=1528119&r2=1528120&view=diff
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/SynchronousQueueService.java (original)
+++ felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/SynchronousQueueService.java Tue Oct  1 15:35:30 2013
@@ -26,6 +26,7 @@ import org.apache.felix.ipojo.extender.q
 import org.apache.felix.ipojo.extender.queue.QueueService;
 import org.osgi.framework.BundleContext;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.Hashtable;
@@ -35,7 +36,7 @@ import java.util.concurrent.*;
 /**
  * An implementation of the Lifecycle Queue Service for synchronous processing.
  */
-public class SynchronousQueueService extends AbstractService implements LifecycleQueueService {
+public class SynchronousQueueService extends AbstractQueueService implements LifecycleQueueService {
 
     private final Statistic m_statistic = new Statistic();
 
@@ -67,7 +68,7 @@ public class SynchronousQueueService ext
     }
 
     public <T> Future<T> submit(Callable<T> callable, Callback<T> callback, String description) {
-        JobInfoCallable<T> exec = new JobInfoCallable<T>(m_statistic, callable, callback, description);
+        JobInfoCallable<T> exec = new JobInfoCallable<T>(this, m_statistic, callable, callback, description);
         try {
             return new ImmediateFuture<T>(exec.call());
         } catch (Exception e) {

Modified: felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/PreferenceQueueService.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/PreferenceQueueService.java?rev=1528120&r1=1528119&r2=1528120&view=diff
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/PreferenceQueueService.java (original)
+++ felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/PreferenceQueueService.java Tue Oct  1 15:35:30 2013
@@ -22,6 +22,7 @@ package org.apache.felix.ipojo.extender.
 import org.apache.felix.ipojo.extender.internal.LifecycleQueueService;
 import org.apache.felix.ipojo.extender.queue.Callback;
 import org.apache.felix.ipojo.extender.queue.JobInfo;
+import org.apache.felix.ipojo.extender.queue.QueueListener;
 import org.apache.felix.ipojo.extender.queue.QueueService;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleReference;
@@ -158,4 +159,12 @@ public class PreferenceQueueService impl
     public <T> Future<T> submit(Callable<T> callable) {
         return submit(callable, "No description");
     }
+
+    public void addQueueListener(final QueueListener listener) {
+        // Intentionally blank, not intended to have listeners
+    }
+
+    public void removeQueueListener(final QueueListener listener) {
+        // Intentionally blank, not intended to have listeners
+    }
 }
\ No newline at end of file

Modified: felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforce/ForwardingQueueService.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforce/ForwardingQueueService.java?rev=1528120&r1=1528119&r2=1528120&view=diff
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforce/ForwardingQueueService.java (original)
+++ felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/internal/queue/pref/enforce/ForwardingQueueService.java Tue Oct  1 15:35:30 2013
@@ -22,6 +22,7 @@ package org.apache.felix.ipojo.extender.
 import org.apache.felix.ipojo.extender.internal.LifecycleQueueService;
 import org.apache.felix.ipojo.extender.queue.Callback;
 import org.apache.felix.ipojo.extender.queue.JobInfo;
+import org.apache.felix.ipojo.extender.queue.QueueListener;
 
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -69,4 +70,12 @@ public abstract class ForwardingQueueSer
     public <T> Future<T> submit(Callable<T> callable) {
         return delegate().submit(callable);
     }
+
+    public void addQueueListener(final QueueListener listener) {
+        delegate().addQueueListener(listener);
+    }
+
+    public void removeQueueListener(final QueueListener listener) {
+        delegate().removeQueueListener(listener);
+    }
 }

Added: felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueListener.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueListener.java?rev=1528120&view=auto
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueListener.java (added)
+++ felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueListener.java Tue Oct  1 15:35:30 2013
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.queue;
+
+/**
+ * A {@link org.apache.felix.ipojo.extender.queue.QueueListener} provides queue management information to external entities:
+ * <ul>
+ *     <li>Job submission</li>
+ *     <li>Job execution</li>
+ *     <li>Job result (success or failure)</li>
+ * </ul>
+ *
+ * Implementer of this interface should not block as the invocation is done synchronously.
+ * Implementers are responsible to register themselves in the {@link org.apache.felix.ipojo.extender.queue.QueueService} they'll observe.
+ */
+public interface QueueListener {
+
+    /**
+     * Invoked when a job is just being enlisted (before processing).
+     * Only {@link JobInfo#getEnlistmentTime()} and {@link JobInfo#getWaitDuration()} provides meaningful values.
+     * Note that {@code waitDuration} value is re-evaluated at each call.
+     * @param info The job being enlisted
+     */
+    void enlisted(JobInfo info);
+
+    /**
+     * Invoked when a job's execution is just about to be started.
+     * Only {@link JobInfo#getEnlistmentTime()}, {@link JobInfo#getWaitDuration()}, {@link JobInfo#getStartTime()}
+     * and {@link JobInfo#getExecutionDuration()} provides meaningful values.
+     * Note that {@code executionDuration} value is re-evaluated at each call.
+     * @param info The job being started
+     */
+    void started(JobInfo info);
+
+    /**
+     * Invoked when a job's execution is finished successfully.
+     * Note the implementers should not retain any references to the provided {@code result} (memory leak).
+     * @param info The executed job
+     * @param result The job's result
+     */
+    void executed(JobInfo info, Object result);
+
+    /**
+     * Invoked when a job's execution is finished with error.
+     * Note the implementers should not retain any references to the provided {@code throwable} (memory leak).
+     * @param info The failed job
+     * @param throwable The job's thrown exception
+     */
+    void failed(JobInfo info, Throwable throwable);
+}

Modified: felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueService.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueService.java?rev=1528120&r1=1528119&r2=1528120&view=diff
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueService.java (original)
+++ felix/trunk/ipojo/runtime/core/src/main/java/org/apache/felix/ipojo/extender/queue/QueueService.java Tue Oct  1 15:35:30 2013
@@ -111,8 +111,15 @@ public interface QueueService {
      */
     <T> Future<T> submit(Callable<T> callable);
 
+    /**
+     * Add a {@link QueueListener} that will be notified on events relative to this {@link QueueService}.
+     * @param listener added listener
+     */
+    void addQueueListener(QueueListener listener);
 
-    // TODO Add a way to add global callbacks
-    //<T> void addGlobalCallback(Callback<T> callback);
-    // <T> void removeGlobalCallback(Callback<T> callback);
+    /**
+     * Remove a {@link QueueListener} from this {@link QueueService}.
+     * @param listener removed listener
+     */
+    void removeQueueListener(QueueListener listener);
 }

Added: felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueServiceTestCase.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueServiceTestCase.java?rev=1528120&view=auto
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueServiceTestCase.java (added)
+++ felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/AbstractQueueServiceTestCase.java Tue Oct  1 15:35:30 2013
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.internal.queue;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.apache.felix.ipojo.extender.queue.Callback;
+import org.apache.felix.ipojo.extender.queue.JobInfo;
+import org.apache.felix.ipojo.extender.queue.QueueListener;
+import org.apache.felix.ipojo.extender.queue.QueueService;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import static org.mockito.Mockito.verify;
+
+import junit.framework.TestCase;
+
+/**
+ * User: guillaume
+ * Date: 01/10/13
+ * Time: 16:36
+ */
+public class AbstractQueueServiceTestCase extends TestCase {
+
+    @Mock
+    private BundleContext m_bundleContext;
+
+    @Mock
+    private JobInfo m_info;
+
+    @Mock
+    private QueueListener m_one;
+
+    @Mock
+    private QueueListener m_two;
+
+    private AbstractQueueService m_queueService;
+
+    @Override
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        m_queueService = new TestableAbstractQueueService(m_bundleContext, QueueService.class);
+    }
+
+    public void testFireEnlistedJobInfo() throws Exception {
+        m_queueService.addQueueListener(m_one);
+        m_queueService.addQueueListener(m_two);
+        m_queueService.fireEnlistedJobInfo(m_info);
+        verify(m_one).enlisted(m_info);
+        verify(m_two).enlisted(m_info);
+    }
+
+    public void testFireStartedJobInfo() throws Exception {
+        m_queueService.addQueueListener(m_one);
+        m_queueService.addQueueListener(m_two);
+        m_queueService.fireStartedJobInfo(m_info);
+        verify(m_one).started(m_info);
+        verify(m_two).started(m_info);
+    }
+
+    public void testFireExecutedJobInfo() throws Exception {
+        m_queueService.addQueueListener(m_one);
+        m_queueService.addQueueListener(m_two);
+        m_queueService.fireExecutedJobInfo(m_info, "hello");
+        verify(m_one).executed(m_info, "hello");
+        verify(m_two).executed(m_info, "hello");
+    }
+
+    public void testFireFailedJobInfo() throws Exception {
+        m_queueService.addQueueListener(m_one);
+        m_queueService.addQueueListener(m_two);
+        Exception throwable = new Exception();
+        m_queueService.fireFailedJobInfo(m_info, throwable);
+        verify(m_one).failed(m_info, throwable);
+        verify(m_two).failed(m_info, throwable);
+    }
+
+    private class TestableAbstractQueueService extends AbstractQueueService {
+
+        public TestableAbstractQueueService(final BundleContext bundleContext, final Class<?> type) {
+            super(bundleContext, type);
+        }
+
+        public int getFinished() {
+            return 0;
+        }
+
+        public int getWaiters() {
+            return 0;
+        }
+
+        public int getCurrents() {
+            return 0;
+        }
+
+        public List<JobInfo> getWaitersInfo() {
+            return null;
+        }
+
+        public <T> Future<T> submit(final Callable<T> callable, final Callback<T> callback, final String description) {
+            return null;
+        }
+
+        public <T> Future<T> submit(final Callable<T> callable, final String description) {
+            return null;
+        }
+
+        public <T> Future<T> submit(final Callable<T> callable) {
+            return null;
+        }
+    }
+
+}

Modified: felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallableTestCase.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallableTestCase.java?rev=1528120&r1=1528119&r2=1528120&view=diff
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallableTestCase.java (original)
+++ felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/JobInfoCallableTestCase.java Tue Oct  1 15:35:30 2013
@@ -19,7 +19,15 @@
 
 package org.apache.felix.ipojo.extender.internal.queue;
 
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import org.apache.felix.ipojo.extender.internal.queue.callable.ExceptionCallable;
 import org.apache.felix.ipojo.extender.internal.queue.callable.StringCallable;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
 
 import junit.framework.TestCase;
 
@@ -27,10 +35,19 @@ import junit.framework.TestCase;
  * Checks the job info callable.
  */
 public class JobInfoCallableTestCase extends TestCase {
+
+    @Mock
+    private QueueNotifier m_notifier;
+
+    @Override
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+    }
+
     public void testCall() throws Exception {
         Statistic stat = new Statistic();
         long mark = System.currentTimeMillis();
-        JobInfoCallable<String> info = new JobInfoCallable<String>(stat, new StringCallable(), null, null);
+        JobInfoCallable<String> info = new JobInfoCallable<String>(m_notifier, stat, new StringCallable(), null, null);
 
         // Before execution
         assertTrue(info.getEnlistmentTime() >= mark);
@@ -49,5 +66,31 @@ public class JobInfoCallableTestCase ext
         assertEquals(0, stat.getCurrentsCounter().get());
         assertEquals(1, stat.getFinishedCounter().get());
 
+        InOrder order = Mockito.inOrder(m_notifier);
+        order.verify(m_notifier).fireEnlistedJobInfo(info);
+        order.verify(m_notifier).fireStartedJobInfo(info);
+        order.verify(m_notifier).fireExecutedJobInfo(info, "hello");
+        verifyNoMoreInteractions(m_notifier);
+
+    }
+
+    public void testFailedCall() throws Exception {
+        Statistic stat = new Statistic();
+        Exception e = new Exception();
+        JobInfoCallable<String> info = new JobInfoCallable<String>(m_notifier, stat, new ExceptionCallable(e), null, null);
+
+        try {
+            info.call();
+        } catch (Exception e1) {
+            InOrder order = Mockito.inOrder(m_notifier);
+            order.verify(m_notifier).fireEnlistedJobInfo(info);
+            order.verify(m_notifier).fireStartedJobInfo(info);
+            order.verify(m_notifier).fireFailedJobInfo(info, e);
+            verifyNoMoreInteractions(m_notifier);
+            return;
+        }
+
+        fail("Should have throw an Exception");
+
     }
 }

Added: felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/callable/ExceptionCallable.java
URL: http://svn.apache.org/viewvc/felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/callable/ExceptionCallable.java?rev=1528120&view=auto
==============================================================================
--- felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/callable/ExceptionCallable.java (added)
+++ felix/trunk/ipojo/runtime/core/src/test/java/org/apache/felix/ipojo/extender/internal/queue/callable/ExceptionCallable.java Tue Oct  1 15:35:30 2013
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.felix.ipojo.extender.internal.queue.callable;
+
+import java.util.concurrent.Callable;
+
+/**
+* A dummy job.
+*/
+public class ExceptionCallable implements Callable<String> {
+
+    private final Exception m_exception;
+
+    public ExceptionCallable(Exception e) {
+        m_exception = e;
+    }
+
+    public String call() throws Exception {
+        throw m_exception;
+    }
+}