You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by gn...@apache.org on 2018/01/31 20:10:04 UTC

svn commit: r1822826 [4/4] - in /aries/trunk/blueprint: blueprint-cm/src/main/java/org/apache/aries/blueprint/compendium/cm/ blueprint-cm/src/test/java/org/apache/aries/blueprint/compendium/cm/ blueprint-core/src/main/java/org/apache/aries/blueprint/co...

Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java Wed Jan 31 20:10:03 2018
@@ -52,410 +52,407 @@ import org.osgi.framework.InvalidSyntaxE
  * it picks one up from the service registry then it shuts the internal one down. This doesn't fully meet
  * the spec for a SchedueledExecutorService. It does not properly implement shutdownNow, but this isn't used
  * by blueprint so for now that should be fine.
- * 
+ * <p>
  * <p>It also wraps the Runnables and Callables so when a task is canceled we quickly clean up memory rather
- *   than waiting for the target to get to the task and purge it.
+ * than waiting for the target to get to the task and purge it.
  * </p>
  */
-public class ScheduledExecutorServiceWrapper implements ScheduledExecutorService, SingleServiceListener
-{
-  public static interface ScheduledExecutorServiceFactory
-  {
-    public ScheduledExecutorService create(String name);
-  }
-
-  private final AtomicReference<ScheduledExecutorService> _current = new AtomicReference<ScheduledExecutorService>();
-  private SingleServiceTracker<ScheduledExecutorService> _tracked;
-  private final AtomicReference<ScheduledExecutorService> _default = new AtomicReference<ScheduledExecutorService>();
-  private final AtomicBoolean _shutdown = new AtomicBoolean();
-  private final Queue<Discardable<Runnable>> _unprocessedWork = new LinkedBlockingQueue<Discardable<Runnable>>();
-  private final RWLock _lock = new RWLock();
-  private final AtomicInteger _invokeEntryCount = new AtomicInteger();
-  private final ScheduledExecutorServiceFactory _factory;
-  private final String _name;
-  
-  public ScheduledExecutorServiceWrapper(BundleContext context, String name, ScheduledExecutorServiceFactory sesf)
-  {
-    _name = name;
-    _factory = sesf;
-    try {
-      _tracked = new SingleServiceTracker<ScheduledExecutorService>(context, ScheduledExecutorService.class, "(aries.blueprint.poolName=" + _name + ")", this);
-      _tracked.open();
-    } catch (InvalidSyntaxException e) {
-      // Just ignore and stick with the default one.
-    }
-
-    if (_current.get() == null) {
-      _default.set(_factory.create(name));
-      if (!!!_current.compareAndSet(null, _default.get())) {
-        _default.getAndSet(null).shutdown();
-      }
-    }
-  }
-
-  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
-  {
-    long timeLeftToWait = unit.toMillis(timeout);
-    long pausePeriod = timeLeftToWait;
-    if (pausePeriod > 1000) pausePeriod = 1000;
-    while (!!!_unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0 && timeLeftToWait > 0) {
-      Thread.sleep(pausePeriod);
-      timeLeftToWait -= pausePeriod;
-      if (timeLeftToWait < pausePeriod) pausePeriod = timeLeftToWait;
-    }
-    return _unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0;
-  }
-
-  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
-      throws InterruptedException
-  {
-    try {
-      return runUnlessShutdown(new Callable<List<Future<T>>>() {
-
-        public List<Future<T>> call() throws Exception
-        {
-          _invokeEntryCount.incrementAndGet();
-          try {
-            return _current.get().invokeAll(tasks);
-          } finally {
-            _invokeEntryCount.decrementAndGet();
-          }
-        }
-        
-      });
-    } catch (InterruptedException e) { throw e;
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, 
-      final long timeout,
-      final TimeUnit unit) throws InterruptedException
-  {
-    try {
-      return runUnlessShutdown(new Callable<List<Future<T>>>() {
-
-        public List<Future<T>> call() throws Exception
-        {
-          _invokeEntryCount.incrementAndGet();
-          try {
-            return _current.get().invokeAll(tasks, timeout, unit);
-          } finally {
-            _invokeEntryCount.decrementAndGet();
-          }
-        }
-        
-      });
-    } catch (InterruptedException e) { throw e;
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException,
-      ExecutionException
-  {
-    try {
-      return runUnlessShutdown(new Callable<T>() {
-
-        public T call() throws Exception
-        {
-          _invokeEntryCount.incrementAndGet();
-          try {
-            return _current.get().invokeAny(tasks);
-          } finally {
-            _invokeEntryCount.decrementAndGet();
-          }
-        }
-        
-      });
-    } catch (InterruptedException e) { throw e;
-    } catch (ExecutionException e) { throw e;
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
-      throws InterruptedException, ExecutionException, TimeoutException
-  {
-    try {
-      return runUnlessShutdown(new Callable<T>() {
-
-        public T call() throws Exception
-        {
-          _invokeEntryCount.incrementAndGet();
-          try {
-            return _current.get().invokeAny(tasks, timeout, unit);
-          } finally {
-            _invokeEntryCount.decrementAndGet();
-          }
-        }
-        
-      });
-    } catch (InterruptedException e) { throw e;
-    } catch (ExecutionException e) { throw e;
-    } catch (TimeoutException e) { throw e;
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  public boolean isShutdown()
-  {
-    return _shutdown.get();
-  }
-
-  public boolean isTerminated()
-  {
-    if (isShutdown()) return _unprocessedWork.isEmpty();
-    else return false;
-  }
-
-  public void shutdown()
-  {
-    _lock.runWriteOperation(new Runnable() {
-      
-      public void run()
-      {
-        _shutdown.set(true);
+public class ScheduledExecutorServiceWrapper implements ScheduledExecutorService, SingleServiceListener {
+    public static interface ScheduledExecutorServiceFactory {
+        public ScheduledExecutorService create(String name);
+    }
+
+    private final AtomicReference<ScheduledExecutorService> _current = new AtomicReference<ScheduledExecutorService>();
+    private SingleServiceTracker<ScheduledExecutorService> _tracked;
+    private final AtomicReference<ScheduledExecutorService> _default = new AtomicReference<ScheduledExecutorService>();
+    private final AtomicBoolean _shutdown = new AtomicBoolean();
+    private final Queue<Discardable<Runnable>> _unprocessedWork = new LinkedBlockingQueue<Discardable<Runnable>>();
+    private final RWLock _lock = new RWLock();
+    private final AtomicInteger _invokeEntryCount = new AtomicInteger();
+    private final ScheduledExecutorServiceFactory _factory;
+    private final String _name;
+
+    public ScheduledExecutorServiceWrapper(BundleContext context, String name, ScheduledExecutorServiceFactory sesf) {
+        _name = name;
+        _factory = sesf;
+        try {
+            _tracked = new SingleServiceTracker<ScheduledExecutorService>(context, ScheduledExecutorService.class, "(aries.blueprint.poolName=" + _name + ")", this);
+            _tracked.open();
+        } catch (InvalidSyntaxException e) {
+            // Just ignore and stick with the default one.
+        }
+
+        if (_current.get() == null) {
+            _default.set(_factory.create(name));
+            if (!!!_current.compareAndSet(null, _default.get())) {
+                _default.getAndSet(null).shutdown();
+            }
+        }
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        long timeLeftToWait = unit.toMillis(timeout);
+        long pausePeriod = timeLeftToWait;
+        if (pausePeriod > 1000) pausePeriod = 1000;
+        while (!!!_unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0 && timeLeftToWait > 0) {
+            Thread.sleep(pausePeriod);
+            timeLeftToWait -= pausePeriod;
+            if (timeLeftToWait < pausePeriod) pausePeriod = timeLeftToWait;
+        }
+        return _unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0;
+    }
+
+    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
+            throws InterruptedException {
+        try {
+            return runUnlessShutdown(new Callable<List<Future<T>>>() {
+
+                public List<Future<T>> call() throws Exception {
+                    _invokeEntryCount.incrementAndGet();
+                    try {
+                        return _current.get().invokeAll(tasks);
+                    } finally {
+                        _invokeEntryCount.decrementAndGet();
+                    }
+                }
+
+            });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks,
+                                         final long timeout,
+                                         final TimeUnit unit) throws InterruptedException {
+        try {
+            return runUnlessShutdown(new Callable<List<Future<T>>>() {
+
+                public List<Future<T>> call() throws Exception {
+                    _invokeEntryCount.incrementAndGet();
+                    try {
+                        return _current.get().invokeAll(tasks, timeout, unit);
+                    } finally {
+                        _invokeEntryCount.decrementAndGet();
+                    }
+                }
+
+            });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException,
+            ExecutionException {
+        try {
+            return runUnlessShutdown(new Callable<T>() {
+
+                public T call() throws Exception {
+                    _invokeEntryCount.incrementAndGet();
+                    try {
+                        return _current.get().invokeAny(tasks);
+                    } finally {
+                        _invokeEntryCount.decrementAndGet();
+                    }
+                }
+
+            });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (ExecutionException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        try {
+            return runUnlessShutdown(new Callable<T>() {
+
+                public T call() throws Exception {
+                    _invokeEntryCount.incrementAndGet();
+                    try {
+                        return _current.get().invokeAny(tasks, timeout, unit);
+                    } finally {
+                        _invokeEntryCount.decrementAndGet();
+                    }
+                }
+
+            });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (ExecutionException e) {
+            throw e;
+        } catch (TimeoutException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    public boolean isShutdown() {
+        return _shutdown.get();
+    }
+
+    public boolean isTerminated() {
+        if (isShutdown()) return _unprocessedWork.isEmpty();
+        else return false;
+    }
+
+    public void shutdown() {
+        _lock.runWriteOperation(new Runnable() {
+
+            public void run() {
+                _shutdown.set(true);
+                ScheduledExecutorService s = _default.get();
+
+                if (s != null) s.shutdown();
+            }
+        });
+    }
+
+    public List<Runnable> shutdownNow() {
+        try {
+            return _lock.runWriteOperation(new Callable<List<Runnable>>() {
+
+                public List<Runnable> call() {
+                    _shutdown.set(true);
+
+                    ScheduledExecutorService s = _default.get();
+
+                    if (s != null) s.shutdownNow();
+
+                    List<Runnable> runnables = new ArrayList<Runnable>();
+
+                    for (Discardable<Runnable> r : _unprocessedWork) {
+                        Runnable newRunnable = r.discard();
+                        if (newRunnable != null) {
+                            runnables.add(newRunnable);
+                        }
+                    }
+
+                    return runnables;
+                }
+
+            });
+        } catch (Exception e) {
+            // This wont happen since our callable doesn't throw any exceptions, so we just return an empty list
+            return Collections.emptyList();
+        }
+    }
+
+    public <T> Future<T> submit(final Callable<T> task) {
+        try {
+            return runUnlessShutdown(new Callable<Future<T>>() {
+
+                public Future<T> call() throws Exception {
+                    DiscardableCallable<T> t = new DiscardableCallable<T>(task, _unprocessedWork);
+                    try {
+                        return new WrappedFuture<T>(_current.get().submit((Callable<T>) t), t);
+                    } catch (RuntimeException e) {
+                        t.discard();
+                        throw e;
+                    }
+                }
+
+            });
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public Future<?> submit(final Runnable task) {
+        try {
+            return runUnlessShutdown(new Callable<Future<?>>() {
+
+                public Future<?> call() {
+                    DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork);
+                    try {
+                        return new WrappedFuture(_current.get().submit(t), t);
+                    } catch (RuntimeException e) {
+                        t.discard();
+                        throw e;
+                    }
+                }
+            });
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    public <T> Future<T> submit(final Runnable task, final T result) {
+        try {
+            return runUnlessShutdown(new Callable<Future<T>>() {
+
+                public Future<T> call() {
+                    DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork);
+                    try {
+                        return new WrappedFuture<T>(_current.get().submit(t, result), t);
+                    } catch (RuntimeException e) {
+                        t.discard();
+                        throw e;
+                    }
+                }
+            });
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    public void execute(final Runnable command) {
+        try {
+            runUnlessShutdown(new Callable<Object>() {
+
+                public Object call() {
+                    DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
+                    try {
+                        _current.get().execute(t);
+                    } catch (RuntimeException e) {
+                        t.discard();
+                        throw e;
+                    }
+                    return null;
+                }
+            });
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+        try {
+            return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
+
+                public ScheduledFuture<?> call() {
+                    DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
+                    try {
+                        return new WrappedScheduledFuture(_current.get().schedule(t, delay, unit), t);
+                    } catch (RuntimeException e) {
+                        t.discard();
+                        throw e;
+                    }
+                }
+            });
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
+        try {
+            return runUnlessShutdown(new Callable<ScheduledFuture<V>>() {
+
+                public ScheduledFuture<V> call() {
+                    DiscardableCallable<V> c = new DiscardableCallable<V>(callable, _unprocessedWork);
+                    try {
+                        return new WrappedScheduledFuture<V>(_current.get().schedule((Callable<V>) c, delay, unit), c);
+                    } catch (RuntimeException e) {
+                        c.discard();
+                        throw e;
+                    }
+                }
+            });
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period,
+                                                  final TimeUnit unit) {
+        try {
+            return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
+
+                public ScheduledFuture<?> call() {
+                    DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
+                    try {
+                        return new WrappedScheduledFuture(_current.get().scheduleAtFixedRate(t, initialDelay, period, unit), t);
+                    } catch (RuntimeException e) {
+                        t.discard();
+                        throw e;
+                    }
+                }
+            });
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay,
+                                                     final TimeUnit unit) {
+        try {
+            return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
+
+                public ScheduledFuture<?> call() {
+                    DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
+                    try {
+                        return new WrappedScheduledFuture(_current.get().scheduleWithFixedDelay(t, initialDelay, delay, unit), t);
+                    } catch (RuntimeException e) {
+                        t.discard();
+                        throw e;
+                    }
+                }
+            });
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
+
+    public void serviceFound() {
         ScheduledExecutorService s = _default.get();
-        
-        if (s != null) s.shutdown();
-      }
-    });
-  }
-
-  public List<Runnable> shutdownNow()
-  {
-    try {
-      return _lock.runWriteOperation(new Callable<List<Runnable>>() {
-
-        public List<Runnable> call()
-        {
-          _shutdown.set(true);
-          
-          ScheduledExecutorService s = _default.get();
-          
-          if (s != null) s.shutdownNow();
-          
-          List<Runnable> runnables = new ArrayList<Runnable>();
-          
-          for (Discardable<Runnable> r : _unprocessedWork) {
-            Runnable newRunnable = r.discard();
-            if (newRunnable != null) {
-              runnables.add(newRunnable);
+        if (_current.compareAndSet(s, _tracked.getService())) {
+            if (s != null) {
+                if (_default.compareAndSet(s, null)) {
+                    s.shutdown();
+                }
             }
-          }
-          
-          return runnables;
-        }
-        
-      });
-    } catch (Exception e) {
-      // This wont happen since our callable doesn't throw any exceptions, so we just return an empty list
-      return Collections.emptyList();
-    }
-  }
-
-  public <T> Future<T> submit(final Callable<T> task)
-  {
-    try {
-      return runUnlessShutdown(new Callable<Future<T>>() {
-
-        public Future<T> call() throws Exception
-        {
-          DiscardableCallable<T> t = new DiscardableCallable<T>(task, _unprocessedWork);
-          try {
-            return new WrappedFuture<T>(_current.get().submit((Callable<T>)t), t) ;
-          } catch (RuntimeException e) {
-            t.discard();
-            throw e;
-          }
-        }
-        
-      });
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public Future<?> submit(final Runnable task)
-  {
-    try {
-      return runUnlessShutdown(new Callable<Future<?>>() {
-
-        public Future<?> call()
-        {
-          DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork);
-          try {
-            return new WrappedFuture(_current.get().submit(t), t);
-          } catch (RuntimeException e) {
-            t.discard();
-            throw e;
-          }
-        }
-      });
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  public <T> Future<T> submit(final Runnable task, final T result)
-  {
-    try {
-      return runUnlessShutdown(new Callable<Future<T>>() {
-
-        public Future<T> call()
-        {
-          DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork);
-          try {
-            return new WrappedFuture<T>(_current.get().submit(t, result), t);
-          } catch (RuntimeException e) {
-            t.discard();
-            throw e;
-          }
-        }
-      });
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  public void execute(final Runnable command)
-  {
-    try {
-      runUnlessShutdown(new Callable<Object>() {
-
-        public Object call()
-        {
-          DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
-          try {
-            _current.get().execute(t);
-          } catch (RuntimeException e) {
-            t.discard();
-            throw e;
-          }
-          return null;
-        }
-      });
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit)
-  {
-    try {
-      return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
-
-        public ScheduledFuture<?> call()
-        {
-          DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
-          try {
-            return new WrappedScheduledFuture(_current.get().schedule(t, delay, unit), t);
-          } catch (RuntimeException e) {
-            t.discard();
-            throw e;
-          }
-        }
-      });
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit)
-  {
-    try {
-      return runUnlessShutdown(new Callable<ScheduledFuture<V>>() {
-
-        public ScheduledFuture<V> call()
-        {
-          DiscardableCallable<V> c = new DiscardableCallable<V>(callable, _unprocessedWork);
-          try {
-            return new WrappedScheduledFuture<V>(_current.get().schedule((Callable<V>)c, delay, unit), c);
-          } catch (RuntimeException e) {
-            c.discard();
-            throw e;
-          }
-        }
-      });
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period,
-      final TimeUnit unit)
-  {
-    try {
-      return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
-
-        public ScheduledFuture<?> call()
-        {
-          DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
-          try {
-            return new WrappedScheduledFuture(_current.get().scheduleAtFixedRate(t, initialDelay, period, unit), t);
-          } catch (RuntimeException e) {
-            t.discard();
-            throw e;
-          }
-        }
-      });
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay,
-      final TimeUnit unit)
-  {
-    try {
-      return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
-
-        public ScheduledFuture<?> call()
-        {
-          DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
-          try {
-            return new WrappedScheduledFuture(_current.get().scheduleWithFixedDelay(t, initialDelay, delay, unit), t);
-          } catch (RuntimeException e) {
-            t.discard();
-            throw e;
-          }
-        }
-      });
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
-
-  public void serviceFound()
-  {
-    ScheduledExecutorService s = _default.get();
-    if (_current.compareAndSet(s, _tracked.getService())) {
-      if (s != null) {
-        if (_default.compareAndSet(s, null)) {
-          s.shutdown();
-        }
-      }
-    }
-  }
-
-  // TODO when lost or replaced we need to move work to the "new" _current. This is a huge change because the futures are not currently stored.
-  public void serviceLost()
-  {
-    ScheduledExecutorService s = _default.get();
-    
-    if (s == null) {
-      s = _factory.create(_name);
-      if (_default.compareAndSet(null, s)) {
-        _current.set(s);
-      }
-    }
-  }
-
-  public void serviceReplaced()
-  {
-    _current.set(_tracked.getService());
-  }
-  
-  private <T> T runUnlessShutdown(final Callable<T> call) throws InterruptedException, ExecutionException, TimeoutException
-  {
-    try {
-      return _lock.runReadOperation(new Callable<T>() 
-          {
-            public T call() throws Exception
-            {
-              if (isShutdown()) throw new RejectedExecutionException();
-              return call.call();
+        }
+    }
+
+    // TODO when lost or replaced we need to move work to the "new" _current. This is a huge change because the futures are not currently stored.
+    public void serviceLost() {
+        ScheduledExecutorService s = _default.get();
+
+        if (s == null) {
+            s = _factory.create(_name);
+            if (_default.compareAndSet(null, s)) {
+                _current.set(s);
             }
-          });
-    } catch (InterruptedException e) { throw e; 
-    } catch (ExecutionException e) { throw e;
-    } catch (TimeoutException e) { throw e;
-    } catch (RuntimeException e) { throw e;
-    } catch (Exception e) { throw new RejectedExecutionException(); }
-  }
+        }
+    }
+
+    public void serviceReplaced() {
+        _current.set(_tracked.getService());
+    }
+
+    private <T> T runUnlessShutdown(final Callable<T> call) throws InterruptedException, ExecutionException, TimeoutException {
+        try {
+            return _lock.runReadOperation(new Callable<T>() {
+                public T call() throws Exception {
+                    if (isShutdown()) throw new RejectedExecutionException();
+                    return call.call();
+                }
+            });
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (ExecutionException e) {
+            throw e;
+        } catch (TimeoutException e) {
+            throw e;
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RejectedExecutionException();
+        }
+    }
 }
\ No newline at end of file

Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java Wed Jan 31 20:10:03 2018
@@ -19,5 +19,5 @@
 package org.apache.aries.blueprint.utils.threading.impl;
 
 public interface Discardable<T> {
-  public <T> T discard();
+    public <T> T discard();
 }
\ No newline at end of file

Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java Wed Jan 31 20:10:03 2018
@@ -25,44 +25,39 @@ import java.util.concurrent.Cancellation
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class DiscardableCallable<V> implements Callable<V>, Runnable, Discardable<Runnable>
-{
-  private AtomicReference<Callable<V>> c = new AtomicReference<Callable<V>>();
-  private Queue<Discardable<Runnable>> _removeFromListOnCall;
-  
-  public DiscardableCallable(Callable<V> call, Queue<Discardable<Runnable>> _unprocessedWork) {
-    c.set(call);
-    _removeFromListOnCall = _unprocessedWork;
-    _removeFromListOnCall.add(this);
-  }
-
-  private DiscardableCallable(Callable<V> call)
-  {
-    c.set(call);
-    _removeFromListOnCall = new LinkedBlockingQueue<Discardable<Runnable>>();
-  }
-
-  public Runnable discard()
-  {
-    _removeFromListOnCall.remove(this);
-    return new DiscardableCallable<V>(c.getAndSet(null)) ;
-  }
-
-  public V call() throws Exception
-  {
-    _removeFromListOnCall.remove(this);
-    Callable<V> call = c.get();
-    if (call != null) {
-      return call.call();
+public class DiscardableCallable<V> implements Callable<V>, Runnable, Discardable<Runnable> {
+    private AtomicReference<Callable<V>> c = new AtomicReference<Callable<V>>();
+    private Queue<Discardable<Runnable>> _removeFromListOnCall;
+
+    public DiscardableCallable(Callable<V> call, Queue<Discardable<Runnable>> _unprocessedWork) {
+        c.set(call);
+        _removeFromListOnCall = _unprocessedWork;
+        _removeFromListOnCall.add(this);
+    }
+
+    private DiscardableCallable(Callable<V> call) {
+        c.set(call);
+        _removeFromListOnCall = new LinkedBlockingQueue<Discardable<Runnable>>();
+    }
+
+    public Runnable discard() {
+        _removeFromListOnCall.remove(this);
+        return new DiscardableCallable<V>(c.getAndSet(null));
+    }
+
+    public V call() throws Exception {
+        _removeFromListOnCall.remove(this);
+        Callable<V> call = c.get();
+        if (call != null) {
+            return call.call();
+        }
+        throw new CancellationException();
     }
-    throw new CancellationException();
-  }
 
-  public void run()
-  {
-    try {
-      call();
-    } catch (Exception e) {
+    public void run() {
+        try {
+            call();
+        } catch (Exception e) {
+        }
     }
-  }
 }
\ No newline at end of file

Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java Wed Jan 31 20:10:03 2018
@@ -23,35 +23,31 @@ import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class DiscardableRunnable implements Runnable, Discardable<Runnable>
-{
-  private AtomicReference<Runnable> r = new AtomicReference<Runnable>();
-  private Queue<Discardable<Runnable>> _removeFromListOnRun;
-  
-  public DiscardableRunnable(Runnable run, Queue<Discardable<Runnable>> _unprocessedWork) {
-    r.set(run);
-    _removeFromListOnRun = _unprocessedWork;
-    _removeFromListOnRun.add(this);
-  }
+public class DiscardableRunnable implements Runnable, Discardable<Runnable> {
+    private AtomicReference<Runnable> r = new AtomicReference<Runnable>();
+    private Queue<Discardable<Runnable>> _removeFromListOnRun;
 
-  private DiscardableRunnable(Runnable run)
-  {
-    r.set(run);
-    _removeFromListOnRun = new LinkedBlockingQueue<Discardable<Runnable>>();
-  }
+    public DiscardableRunnable(Runnable run, Queue<Discardable<Runnable>> _unprocessedWork) {
+        r.set(run);
+        _removeFromListOnRun = _unprocessedWork;
+        _removeFromListOnRun.add(this);
+    }
+
+    private DiscardableRunnable(Runnable run) {
+        r.set(run);
+        _removeFromListOnRun = new LinkedBlockingQueue<Discardable<Runnable>>();
+    }
 
-  public void run()
-  {
-    _removeFromListOnRun.remove(this);
-    Runnable run = r.get();
-    if (run != null) {
-      run.run();
+    public void run() {
+        _removeFromListOnRun.remove(this);
+        Runnable run = r.get();
+        if (run != null) {
+            run.run();
+        }
     }
-  }
 
-  public Runnable discard()
-  {
-    _removeFromListOnRun.remove(this);
-    return new DiscardableRunnable(r.getAndSet(null));
-  }
+    public Runnable discard() {
+        _removeFromListOnRun.remove(this);
+        return new DiscardableRunnable(r.getAndSet(null));
+    }
 }
\ No newline at end of file

Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java Wed Jan 31 20:10:03 2018
@@ -23,43 +23,37 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public class WrappedFuture<T> implements Future<T>
-{
-  private Discardable<?> _discardable;
-  private Future<T> _future;
-  
-  public WrappedFuture(Future<T> f, Discardable<?> d) {
-    _future = f;
-    _discardable = d;
-  }
-  
-  public boolean cancel(boolean arg0)
-  {
-    boolean result = _future.cancel(arg0);
-    
-    if (result) _discardable.discard();
-    
-    return result;
-  }
-
-  public T get() throws InterruptedException, ExecutionException
-  {
-    return _future.get();
-  }
-
-  public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException,
-      TimeoutException
-  {
-    return _future.get(timeout, timeunit);
-  }
-
-  public boolean isCancelled()
-  {
-    return _future.isCancelled();
-  }
-
-  public boolean isDone()
-  {
-    return _future.isDone();
-  }
+public class WrappedFuture<T> implements Future<T> {
+    private Discardable<?> _discardable;
+    private Future<T> _future;
+
+    public WrappedFuture(Future<T> f, Discardable<?> d) {
+        _future = f;
+        _discardable = d;
+    }
+
+    public boolean cancel(boolean arg0) {
+        boolean result = _future.cancel(arg0);
+
+        if (result) _discardable.discard();
+
+        return result;
+    }
+
+    public T get() throws InterruptedException, ExecutionException {
+        return _future.get();
+    }
+
+    public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException,
+            TimeoutException {
+        return _future.get(timeout, timeunit);
+    }
+
+    public boolean isCancelled() {
+        return _future.isCancelled();
+    }
+
+    public boolean isDone() {
+        return _future.isDone();
+    }
 }
\ No newline at end of file

Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java Wed Jan 31 20:10:03 2018
@@ -25,53 +25,45 @@ import java.util.concurrent.ScheduledFut
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public class WrappedScheduledFuture<T> implements ScheduledFuture<T>
-{
-  private Discardable<?> _discardable;
-  private ScheduledFuture<T> _future;
-  
-  public WrappedScheduledFuture(ScheduledFuture<T> f, Discardable<?> d) {
-    _future = f;
-    _discardable = d;
-  }
-
-  public long getDelay(TimeUnit timeunit)
-  {
-    return _future.getDelay(timeunit);
-  }
-
-  public int compareTo(Delayed other)
-  {
-    return _future.compareTo(other);
-  }
-
-  public boolean cancel(boolean arg0)
-  {
-    boolean result = _future.cancel(arg0);
-    
-    if (result) _discardable.discard();
-    
-    return result;
-  }
-
-  public T get() throws InterruptedException, ExecutionException
-  {
-    return _future.get();
-  }
-
-  public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException,
-      TimeoutException
-  {
-    return _future.get(timeout, timeunit);
-  }
-
-  public boolean isCancelled()
-  {
-    return _future.isCancelled();
-  }
-
-  public boolean isDone()
-  {
-    return _future.isDone();
-  }
+public class WrappedScheduledFuture<T> implements ScheduledFuture<T> {
+    private Discardable<?> _discardable;
+    private ScheduledFuture<T> _future;
+
+    public WrappedScheduledFuture(ScheduledFuture<T> f, Discardable<?> d) {
+        _future = f;
+        _discardable = d;
+    }
+
+    public long getDelay(TimeUnit timeunit) {
+        return _future.getDelay(timeunit);
+    }
+
+    public int compareTo(Delayed other) {
+        return _future.compareTo(other);
+    }
+
+    public boolean cancel(boolean arg0) {
+        boolean result = _future.cancel(arg0);
+
+        if (result) _discardable.discard();
+
+        return result;
+    }
+
+    public T get() throws InterruptedException, ExecutionException {
+        return _future.get();
+    }
+
+    public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException,
+            TimeoutException {
+        return _future.get(timeout, timeunit);
+    }
+
+    public boolean isCancelled() {
+        return _future.isCancelled();
+    }
+
+    public boolean isDone() {
+        return _future.isDone();
+    }
 }
\ No newline at end of file

Modified: aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java Wed Jan 31 20:10:03 2018
@@ -37,30 +37,37 @@ import org.xml.sax.SAXException;
 public abstract class AbstractBlueprintTest extends TestCase {
 
     protected ComponentDefinitionRegistryImpl parse(String name) throws Exception {
-      NamespaceHandlerSet handlers = new NamespaceHandlerSet() {
+        NamespaceHandlerSet handlers = new NamespaceHandlerSet() {
             public Set<URI> getNamespaces() {
                 return null;
             }
+
             public NamespaceHandler getNamespaceHandler(URI namespace) {
                 if (ExtNamespaceHandler.isExtNamespace(namespace.toString())) {
-                  return new ExtNamespaceHandler();
+                    return new ExtNamespaceHandler();
                 } else {
-                  return null;
+                    return null;
                 }
             }
+
             public void removeListener(NamespaceHandlerSet.Listener listener) {
             }
+
             public Schema getSchema() throws SAXException, IOException {
                 return null;
             }
+
             public Schema getSchema(Map<String, String> locations) throws SAXException, IOException {
-              return null;
+                return null;
             }
+
             public boolean isComplete() {
                 return false;
             }
+
             public void addListener(NamespaceHandlerSet.Listener listener) {
             }
+
             public void destroy() {
             }
         };