You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by gn...@apache.org on 2018/01/31 20:10:04 UTC
svn commit: r1822826 [4/4] - in /aries/trunk/blueprint:
blueprint-cm/src/main/java/org/apache/aries/blueprint/compendium/cm/
blueprint-cm/src/test/java/org/apache/aries/blueprint/compendium/cm/
blueprint-core/src/main/java/org/apache/aries/blueprint/co...
Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/ScheduledExecutorServiceWrapper.java Wed Jan 31 20:10:03 2018
@@ -52,410 +52,407 @@ import org.osgi.framework.InvalidSyntaxE
* it picks one up from the service registry then it shuts the internal one down. This doesn't fully meet
* the spec for a SchedueledExecutorService. It does not properly implement shutdownNow, but this isn't used
* by blueprint so for now that should be fine.
- *
+ * <p>
* <p>It also wraps the Runnables and Callables so when a task is canceled we quickly clean up memory rather
- * than waiting for the target to get to the task and purge it.
+ * than waiting for the target to get to the task and purge it.
* </p>
*/
-public class ScheduledExecutorServiceWrapper implements ScheduledExecutorService, SingleServiceListener
-{
- public static interface ScheduledExecutorServiceFactory
- {
- public ScheduledExecutorService create(String name);
- }
-
- private final AtomicReference<ScheduledExecutorService> _current = new AtomicReference<ScheduledExecutorService>();
- private SingleServiceTracker<ScheduledExecutorService> _tracked;
- private final AtomicReference<ScheduledExecutorService> _default = new AtomicReference<ScheduledExecutorService>();
- private final AtomicBoolean _shutdown = new AtomicBoolean();
- private final Queue<Discardable<Runnable>> _unprocessedWork = new LinkedBlockingQueue<Discardable<Runnable>>();
- private final RWLock _lock = new RWLock();
- private final AtomicInteger _invokeEntryCount = new AtomicInteger();
- private final ScheduledExecutorServiceFactory _factory;
- private final String _name;
-
- public ScheduledExecutorServiceWrapper(BundleContext context, String name, ScheduledExecutorServiceFactory sesf)
- {
- _name = name;
- _factory = sesf;
- try {
- _tracked = new SingleServiceTracker<ScheduledExecutorService>(context, ScheduledExecutorService.class, "(aries.blueprint.poolName=" + _name + ")", this);
- _tracked.open();
- } catch (InvalidSyntaxException e) {
- // Just ignore and stick with the default one.
- }
-
- if (_current.get() == null) {
- _default.set(_factory.create(name));
- if (!!!_current.compareAndSet(null, _default.get())) {
- _default.getAndSet(null).shutdown();
- }
- }
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
- {
- long timeLeftToWait = unit.toMillis(timeout);
- long pausePeriod = timeLeftToWait;
- if (pausePeriod > 1000) pausePeriod = 1000;
- while (!!!_unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0 && timeLeftToWait > 0) {
- Thread.sleep(pausePeriod);
- timeLeftToWait -= pausePeriod;
- if (timeLeftToWait < pausePeriod) pausePeriod = timeLeftToWait;
- }
- return _unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0;
- }
-
- public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
- throws InterruptedException
- {
- try {
- return runUnlessShutdown(new Callable<List<Future<T>>>() {
-
- public List<Future<T>> call() throws Exception
- {
- _invokeEntryCount.incrementAndGet();
- try {
- return _current.get().invokeAll(tasks);
- } finally {
- _invokeEntryCount.decrementAndGet();
- }
- }
-
- });
- } catch (InterruptedException e) { throw e;
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks,
- final long timeout,
- final TimeUnit unit) throws InterruptedException
- {
- try {
- return runUnlessShutdown(new Callable<List<Future<T>>>() {
-
- public List<Future<T>> call() throws Exception
- {
- _invokeEntryCount.incrementAndGet();
- try {
- return _current.get().invokeAll(tasks, timeout, unit);
- } finally {
- _invokeEntryCount.decrementAndGet();
- }
- }
-
- });
- } catch (InterruptedException e) { throw e;
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException,
- ExecutionException
- {
- try {
- return runUnlessShutdown(new Callable<T>() {
-
- public T call() throws Exception
- {
- _invokeEntryCount.incrementAndGet();
- try {
- return _current.get().invokeAny(tasks);
- } finally {
- _invokeEntryCount.decrementAndGet();
- }
- }
-
- });
- } catch (InterruptedException e) { throw e;
- } catch (ExecutionException e) { throw e;
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException
- {
- try {
- return runUnlessShutdown(new Callable<T>() {
-
- public T call() throws Exception
- {
- _invokeEntryCount.incrementAndGet();
- try {
- return _current.get().invokeAny(tasks, timeout, unit);
- } finally {
- _invokeEntryCount.decrementAndGet();
- }
- }
-
- });
- } catch (InterruptedException e) { throw e;
- } catch (ExecutionException e) { throw e;
- } catch (TimeoutException e) { throw e;
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- public boolean isShutdown()
- {
- return _shutdown.get();
- }
-
- public boolean isTerminated()
- {
- if (isShutdown()) return _unprocessedWork.isEmpty();
- else return false;
- }
-
- public void shutdown()
- {
- _lock.runWriteOperation(new Runnable() {
-
- public void run()
- {
- _shutdown.set(true);
+public class ScheduledExecutorServiceWrapper implements ScheduledExecutorService, SingleServiceListener {
+ public static interface ScheduledExecutorServiceFactory {
+ public ScheduledExecutorService create(String name);
+ }
+
+ private final AtomicReference<ScheduledExecutorService> _current = new AtomicReference<ScheduledExecutorService>();
+ private SingleServiceTracker<ScheduledExecutorService> _tracked;
+ private final AtomicReference<ScheduledExecutorService> _default = new AtomicReference<ScheduledExecutorService>();
+ private final AtomicBoolean _shutdown = new AtomicBoolean();
+ private final Queue<Discardable<Runnable>> _unprocessedWork = new LinkedBlockingQueue<Discardable<Runnable>>();
+ private final RWLock _lock = new RWLock();
+ private final AtomicInteger _invokeEntryCount = new AtomicInteger();
+ private final ScheduledExecutorServiceFactory _factory;
+ private final String _name;
+
+ public ScheduledExecutorServiceWrapper(BundleContext context, String name, ScheduledExecutorServiceFactory sesf) {
+ _name = name;
+ _factory = sesf;
+ try {
+ _tracked = new SingleServiceTracker<ScheduledExecutorService>(context, ScheduledExecutorService.class, "(aries.blueprint.poolName=" + _name + ")", this);
+ _tracked.open();
+ } catch (InvalidSyntaxException e) {
+ // Just ignore and stick with the default one.
+ }
+
+ if (_current.get() == null) {
+ _default.set(_factory.create(name));
+ if (!!!_current.compareAndSet(null, _default.get())) {
+ _default.getAndSet(null).shutdown();
+ }
+ }
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ long timeLeftToWait = unit.toMillis(timeout);
+ long pausePeriod = timeLeftToWait;
+ if (pausePeriod > 1000) pausePeriod = 1000;
+ while (!!!_unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0 && timeLeftToWait > 0) {
+ Thread.sleep(pausePeriod);
+ timeLeftToWait -= pausePeriod;
+ if (timeLeftToWait < pausePeriod) pausePeriod = timeLeftToWait;
+ }
+ return _unprocessedWork.isEmpty() && _invokeEntryCount.get() > 0;
+ }
+
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ try {
+ return runUnlessShutdown(new Callable<List<Future<T>>>() {
+
+ public List<Future<T>> call() throws Exception {
+ _invokeEntryCount.incrementAndGet();
+ try {
+ return _current.get().invokeAll(tasks);
+ } finally {
+ _invokeEntryCount.decrementAndGet();
+ }
+ }
+
+ });
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks,
+ final long timeout,
+ final TimeUnit unit) throws InterruptedException {
+ try {
+ return runUnlessShutdown(new Callable<List<Future<T>>>() {
+
+ public List<Future<T>> call() throws Exception {
+ _invokeEntryCount.incrementAndGet();
+ try {
+ return _current.get().invokeAll(tasks, timeout, unit);
+ } finally {
+ _invokeEntryCount.decrementAndGet();
+ }
+ }
+
+ });
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException,
+ ExecutionException {
+ try {
+ return runUnlessShutdown(new Callable<T>() {
+
+ public T call() throws Exception {
+ _invokeEntryCount.incrementAndGet();
+ try {
+ return _current.get().invokeAny(tasks);
+ } finally {
+ _invokeEntryCount.decrementAndGet();
+ }
+ }
+
+ });
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (ExecutionException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ try {
+ return runUnlessShutdown(new Callable<T>() {
+
+ public T call() throws Exception {
+ _invokeEntryCount.incrementAndGet();
+ try {
+ return _current.get().invokeAny(tasks, timeout, unit);
+ } finally {
+ _invokeEntryCount.decrementAndGet();
+ }
+ }
+
+ });
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (ExecutionException e) {
+ throw e;
+ } catch (TimeoutException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public boolean isShutdown() {
+ return _shutdown.get();
+ }
+
+ public boolean isTerminated() {
+ if (isShutdown()) return _unprocessedWork.isEmpty();
+ else return false;
+ }
+
+ public void shutdown() {
+ _lock.runWriteOperation(new Runnable() {
+
+ public void run() {
+ _shutdown.set(true);
+ ScheduledExecutorService s = _default.get();
+
+ if (s != null) s.shutdown();
+ }
+ });
+ }
+
+ public List<Runnable> shutdownNow() {
+ try {
+ return _lock.runWriteOperation(new Callable<List<Runnable>>() {
+
+ public List<Runnable> call() {
+ _shutdown.set(true);
+
+ ScheduledExecutorService s = _default.get();
+
+ if (s != null) s.shutdownNow();
+
+ List<Runnable> runnables = new ArrayList<Runnable>();
+
+ for (Discardable<Runnable> r : _unprocessedWork) {
+ Runnable newRunnable = r.discard();
+ if (newRunnable != null) {
+ runnables.add(newRunnable);
+ }
+ }
+
+ return runnables;
+ }
+
+ });
+ } catch (Exception e) {
+ // This wont happen since our callable doesn't throw any exceptions, so we just return an empty list
+ return Collections.emptyList();
+ }
+ }
+
+ public <T> Future<T> submit(final Callable<T> task) {
+ try {
+ return runUnlessShutdown(new Callable<Future<T>>() {
+
+ public Future<T> call() throws Exception {
+ DiscardableCallable<T> t = new DiscardableCallable<T>(task, _unprocessedWork);
+ try {
+ return new WrappedFuture<T>(_current.get().submit((Callable<T>) t), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+
+ });
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public Future<?> submit(final Runnable task) {
+ try {
+ return runUnlessShutdown(new Callable<Future<?>>() {
+
+ public Future<?> call() {
+ DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork);
+ try {
+ return new WrappedFuture(_current.get().submit(t), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public <T> Future<T> submit(final Runnable task, final T result) {
+ try {
+ return runUnlessShutdown(new Callable<Future<T>>() {
+
+ public Future<T> call() {
+ DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork);
+ try {
+ return new WrappedFuture<T>(_current.get().submit(t, result), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public void execute(final Runnable command) {
+ try {
+ runUnlessShutdown(new Callable<Object>() {
+
+ public Object call() {
+ DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
+ try {
+ _current.get().execute(t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+ try {
+ return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
+
+ public ScheduledFuture<?> call() {
+ DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
+ try {
+ return new WrappedScheduledFuture(_current.get().schedule(t, delay, unit), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) {
+ try {
+ return runUnlessShutdown(new Callable<ScheduledFuture<V>>() {
+
+ public ScheduledFuture<V> call() {
+ DiscardableCallable<V> c = new DiscardableCallable<V>(callable, _unprocessedWork);
+ try {
+ return new WrappedScheduledFuture<V>(_current.get().schedule((Callable<V>) c, delay, unit), c);
+ } catch (RuntimeException e) {
+ c.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period,
+ final TimeUnit unit) {
+ try {
+ return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
+
+ public ScheduledFuture<?> call() {
+ DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
+ try {
+ return new WrappedScheduledFuture(_current.get().scheduleAtFixedRate(t, initialDelay, period, unit), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay,
+ final TimeUnit unit) {
+ try {
+ return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
+
+ public ScheduledFuture<?> call() {
+ DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
+ try {
+ return new WrappedScheduledFuture(_current.get().scheduleWithFixedDelay(t, initialDelay, delay, unit), t);
+ } catch (RuntimeException e) {
+ t.discard();
+ throw e;
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
+
+ public void serviceFound() {
ScheduledExecutorService s = _default.get();
-
- if (s != null) s.shutdown();
- }
- });
- }
-
- public List<Runnable> shutdownNow()
- {
- try {
- return _lock.runWriteOperation(new Callable<List<Runnable>>() {
-
- public List<Runnable> call()
- {
- _shutdown.set(true);
-
- ScheduledExecutorService s = _default.get();
-
- if (s != null) s.shutdownNow();
-
- List<Runnable> runnables = new ArrayList<Runnable>();
-
- for (Discardable<Runnable> r : _unprocessedWork) {
- Runnable newRunnable = r.discard();
- if (newRunnable != null) {
- runnables.add(newRunnable);
+ if (_current.compareAndSet(s, _tracked.getService())) {
+ if (s != null) {
+ if (_default.compareAndSet(s, null)) {
+ s.shutdown();
+ }
}
- }
-
- return runnables;
- }
-
- });
- } catch (Exception e) {
- // This wont happen since our callable doesn't throw any exceptions, so we just return an empty list
- return Collections.emptyList();
- }
- }
-
- public <T> Future<T> submit(final Callable<T> task)
- {
- try {
- return runUnlessShutdown(new Callable<Future<T>>() {
-
- public Future<T> call() throws Exception
- {
- DiscardableCallable<T> t = new DiscardableCallable<T>(task, _unprocessedWork);
- try {
- return new WrappedFuture<T>(_current.get().submit((Callable<T>)t), t) ;
- } catch (RuntimeException e) {
- t.discard();
- throw e;
- }
- }
-
- });
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Future<?> submit(final Runnable task)
- {
- try {
- return runUnlessShutdown(new Callable<Future<?>>() {
-
- public Future<?> call()
- {
- DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork);
- try {
- return new WrappedFuture(_current.get().submit(t), t);
- } catch (RuntimeException e) {
- t.discard();
- throw e;
- }
- }
- });
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- public <T> Future<T> submit(final Runnable task, final T result)
- {
- try {
- return runUnlessShutdown(new Callable<Future<T>>() {
-
- public Future<T> call()
- {
- DiscardableRunnable t = new DiscardableRunnable(task, _unprocessedWork);
- try {
- return new WrappedFuture<T>(_current.get().submit(t, result), t);
- } catch (RuntimeException e) {
- t.discard();
- throw e;
- }
- }
- });
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- public void execute(final Runnable command)
- {
- try {
- runUnlessShutdown(new Callable<Object>() {
-
- public Object call()
- {
- DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
- try {
- _current.get().execute(t);
- } catch (RuntimeException e) {
- t.discard();
- throw e;
- }
- return null;
- }
- });
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit)
- {
- try {
- return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
-
- public ScheduledFuture<?> call()
- {
- DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
- try {
- return new WrappedScheduledFuture(_current.get().schedule(t, delay, unit), t);
- } catch (RuntimeException e) {
- t.discard();
- throw e;
- }
- }
- });
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit)
- {
- try {
- return runUnlessShutdown(new Callable<ScheduledFuture<V>>() {
-
- public ScheduledFuture<V> call()
- {
- DiscardableCallable<V> c = new DiscardableCallable<V>(callable, _unprocessedWork);
- try {
- return new WrappedScheduledFuture<V>(_current.get().schedule((Callable<V>)c, delay, unit), c);
- } catch (RuntimeException e) {
- c.discard();
- throw e;
- }
- }
- });
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public ScheduledFuture<?> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period,
- final TimeUnit unit)
- {
- try {
- return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
-
- public ScheduledFuture<?> call()
- {
- DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
- try {
- return new WrappedScheduledFuture(_current.get().scheduleAtFixedRate(t, initialDelay, period, unit), t);
- } catch (RuntimeException e) {
- t.discard();
- throw e;
- }
- }
- });
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay,
- final TimeUnit unit)
- {
- try {
- return runUnlessShutdown(new Callable<ScheduledFuture<?>>() {
-
- public ScheduledFuture<?> call()
- {
- DiscardableRunnable t = new DiscardableRunnable(command, _unprocessedWork);
- try {
- return new WrappedScheduledFuture(_current.get().scheduleWithFixedDelay(t, initialDelay, delay, unit), t);
- } catch (RuntimeException e) {
- t.discard();
- throw e;
- }
- }
- });
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
-
- public void serviceFound()
- {
- ScheduledExecutorService s = _default.get();
- if (_current.compareAndSet(s, _tracked.getService())) {
- if (s != null) {
- if (_default.compareAndSet(s, null)) {
- s.shutdown();
- }
- }
- }
- }
-
- // TODO when lost or replaced we need to move work to the "new" _current. This is a huge change because the futures are not currently stored.
- public void serviceLost()
- {
- ScheduledExecutorService s = _default.get();
-
- if (s == null) {
- s = _factory.create(_name);
- if (_default.compareAndSet(null, s)) {
- _current.set(s);
- }
- }
- }
-
- public void serviceReplaced()
- {
- _current.set(_tracked.getService());
- }
-
- private <T> T runUnlessShutdown(final Callable<T> call) throws InterruptedException, ExecutionException, TimeoutException
- {
- try {
- return _lock.runReadOperation(new Callable<T>()
- {
- public T call() throws Exception
- {
- if (isShutdown()) throw new RejectedExecutionException();
- return call.call();
+ }
+ }
+
+ // TODO when lost or replaced we need to move work to the "new" _current. This is a huge change because the futures are not currently stored.
+ public void serviceLost() {
+ ScheduledExecutorService s = _default.get();
+
+ if (s == null) {
+ s = _factory.create(_name);
+ if (_default.compareAndSet(null, s)) {
+ _current.set(s);
}
- });
- } catch (InterruptedException e) { throw e;
- } catch (ExecutionException e) { throw e;
- } catch (TimeoutException e) { throw e;
- } catch (RuntimeException e) { throw e;
- } catch (Exception e) { throw new RejectedExecutionException(); }
- }
+ }
+ }
+
+ public void serviceReplaced() {
+ _current.set(_tracked.getService());
+ }
+
+ private <T> T runUnlessShutdown(final Callable<T> call) throws InterruptedException, ExecutionException, TimeoutException {
+ try {
+ return _lock.runReadOperation(new Callable<T>() {
+ public T call() throws Exception {
+ if (isShutdown()) throw new RejectedExecutionException();
+ return call.call();
+ }
+ });
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (ExecutionException e) {
+ throw e;
+ } catch (TimeoutException e) {
+ throw e;
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RejectedExecutionException();
+ }
+ }
}
\ No newline at end of file
Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/Discardable.java Wed Jan 31 20:10:03 2018
@@ -19,5 +19,5 @@
package org.apache.aries.blueprint.utils.threading.impl;
public interface Discardable<T> {
- public <T> T discard();
+ public <T> T discard();
}
\ No newline at end of file
Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableCallable.java Wed Jan 31 20:10:03 2018
@@ -25,44 +25,39 @@ import java.util.concurrent.Cancellation
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
-public class DiscardableCallable<V> implements Callable<V>, Runnable, Discardable<Runnable>
-{
- private AtomicReference<Callable<V>> c = new AtomicReference<Callable<V>>();
- private Queue<Discardable<Runnable>> _removeFromListOnCall;
-
- public DiscardableCallable(Callable<V> call, Queue<Discardable<Runnable>> _unprocessedWork) {
- c.set(call);
- _removeFromListOnCall = _unprocessedWork;
- _removeFromListOnCall.add(this);
- }
-
- private DiscardableCallable(Callable<V> call)
- {
- c.set(call);
- _removeFromListOnCall = new LinkedBlockingQueue<Discardable<Runnable>>();
- }
-
- public Runnable discard()
- {
- _removeFromListOnCall.remove(this);
- return new DiscardableCallable<V>(c.getAndSet(null)) ;
- }
-
- public V call() throws Exception
- {
- _removeFromListOnCall.remove(this);
- Callable<V> call = c.get();
- if (call != null) {
- return call.call();
+public class DiscardableCallable<V> implements Callable<V>, Runnable, Discardable<Runnable> {
+ private AtomicReference<Callable<V>> c = new AtomicReference<Callable<V>>();
+ private Queue<Discardable<Runnable>> _removeFromListOnCall;
+
+ public DiscardableCallable(Callable<V> call, Queue<Discardable<Runnable>> _unprocessedWork) {
+ c.set(call);
+ _removeFromListOnCall = _unprocessedWork;
+ _removeFromListOnCall.add(this);
+ }
+
+ private DiscardableCallable(Callable<V> call) {
+ c.set(call);
+ _removeFromListOnCall = new LinkedBlockingQueue<Discardable<Runnable>>();
+ }
+
+ public Runnable discard() {
+ _removeFromListOnCall.remove(this);
+ return new DiscardableCallable<V>(c.getAndSet(null));
+ }
+
+ public V call() throws Exception {
+ _removeFromListOnCall.remove(this);
+ Callable<V> call = c.get();
+ if (call != null) {
+ return call.call();
+ }
+ throw new CancellationException();
}
- throw new CancellationException();
- }
- public void run()
- {
- try {
- call();
- } catch (Exception e) {
+ public void run() {
+ try {
+ call();
+ } catch (Exception e) {
+ }
}
- }
}
\ No newline at end of file
Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/DiscardableRunnable.java Wed Jan 31 20:10:03 2018
@@ -23,35 +23,31 @@ import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
-public class DiscardableRunnable implements Runnable, Discardable<Runnable>
-{
- private AtomicReference<Runnable> r = new AtomicReference<Runnable>();
- private Queue<Discardable<Runnable>> _removeFromListOnRun;
-
- public DiscardableRunnable(Runnable run, Queue<Discardable<Runnable>> _unprocessedWork) {
- r.set(run);
- _removeFromListOnRun = _unprocessedWork;
- _removeFromListOnRun.add(this);
- }
+public class DiscardableRunnable implements Runnable, Discardable<Runnable> {
+ private AtomicReference<Runnable> r = new AtomicReference<Runnable>();
+ private Queue<Discardable<Runnable>> _removeFromListOnRun;
- private DiscardableRunnable(Runnable run)
- {
- r.set(run);
- _removeFromListOnRun = new LinkedBlockingQueue<Discardable<Runnable>>();
- }
+ public DiscardableRunnable(Runnable run, Queue<Discardable<Runnable>> _unprocessedWork) {
+ r.set(run);
+ _removeFromListOnRun = _unprocessedWork;
+ _removeFromListOnRun.add(this);
+ }
+
+ private DiscardableRunnable(Runnable run) {
+ r.set(run);
+ _removeFromListOnRun = new LinkedBlockingQueue<Discardable<Runnable>>();
+ }
- public void run()
- {
- _removeFromListOnRun.remove(this);
- Runnable run = r.get();
- if (run != null) {
- run.run();
+ public void run() {
+ _removeFromListOnRun.remove(this);
+ Runnable run = r.get();
+ if (run != null) {
+ run.run();
+ }
}
- }
- public Runnable discard()
- {
- _removeFromListOnRun.remove(this);
- return new DiscardableRunnable(r.getAndSet(null));
- }
+ public Runnable discard() {
+ _removeFromListOnRun.remove(this);
+ return new DiscardableRunnable(r.getAndSet(null));
+ }
}
\ No newline at end of file
Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedFuture.java Wed Jan 31 20:10:03 2018
@@ -23,43 +23,37 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public class WrappedFuture<T> implements Future<T>
-{
- private Discardable<?> _discardable;
- private Future<T> _future;
-
- public WrappedFuture(Future<T> f, Discardable<?> d) {
- _future = f;
- _discardable = d;
- }
-
- public boolean cancel(boolean arg0)
- {
- boolean result = _future.cancel(arg0);
-
- if (result) _discardable.discard();
-
- return result;
- }
-
- public T get() throws InterruptedException, ExecutionException
- {
- return _future.get();
- }
-
- public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException,
- TimeoutException
- {
- return _future.get(timeout, timeunit);
- }
-
- public boolean isCancelled()
- {
- return _future.isCancelled();
- }
-
- public boolean isDone()
- {
- return _future.isDone();
- }
+public class WrappedFuture<T> implements Future<T> {
+ private Discardable<?> _discardable;
+ private Future<T> _future;
+
+ public WrappedFuture(Future<T> f, Discardable<?> d) {
+ _future = f;
+ _discardable = d;
+ }
+
+ public boolean cancel(boolean arg0) {
+ boolean result = _future.cancel(arg0);
+
+ if (result) _discardable.discard();
+
+ return result;
+ }
+
+ public T get() throws InterruptedException, ExecutionException {
+ return _future.get();
+ }
+
+ public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException,
+ TimeoutException {
+ return _future.get(timeout, timeunit);
+ }
+
+ public boolean isCancelled() {
+ return _future.isCancelled();
+ }
+
+ public boolean isDone() {
+ return _future.isDone();
+ }
}
\ No newline at end of file
Modified: aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/main/java/org/apache/aries/blueprint/utils/threading/impl/WrappedScheduledFuture.java Wed Jan 31 20:10:03 2018
@@ -25,53 +25,45 @@ import java.util.concurrent.ScheduledFut
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public class WrappedScheduledFuture<T> implements ScheduledFuture<T>
-{
- private Discardable<?> _discardable;
- private ScheduledFuture<T> _future;
-
- public WrappedScheduledFuture(ScheduledFuture<T> f, Discardable<?> d) {
- _future = f;
- _discardable = d;
- }
-
- public long getDelay(TimeUnit timeunit)
- {
- return _future.getDelay(timeunit);
- }
-
- public int compareTo(Delayed other)
- {
- return _future.compareTo(other);
- }
-
- public boolean cancel(boolean arg0)
- {
- boolean result = _future.cancel(arg0);
-
- if (result) _discardable.discard();
-
- return result;
- }
-
- public T get() throws InterruptedException, ExecutionException
- {
- return _future.get();
- }
-
- public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException,
- TimeoutException
- {
- return _future.get(timeout, timeunit);
- }
-
- public boolean isCancelled()
- {
- return _future.isCancelled();
- }
-
- public boolean isDone()
- {
- return _future.isDone();
- }
+public class WrappedScheduledFuture<T> implements ScheduledFuture<T> {
+ private Discardable<?> _discardable;
+ private ScheduledFuture<T> _future;
+
+ public WrappedScheduledFuture(ScheduledFuture<T> f, Discardable<?> d) {
+ _future = f;
+ _discardable = d;
+ }
+
+ public long getDelay(TimeUnit timeunit) {
+ return _future.getDelay(timeunit);
+ }
+
+ public int compareTo(Delayed other) {
+ return _future.compareTo(other);
+ }
+
+ public boolean cancel(boolean arg0) {
+ boolean result = _future.cancel(arg0);
+
+ if (result) _discardable.discard();
+
+ return result;
+ }
+
+ public T get() throws InterruptedException, ExecutionException {
+ return _future.get();
+ }
+
+ public T get(long timeout, TimeUnit timeunit) throws InterruptedException, ExecutionException,
+ TimeoutException {
+ return _future.get(timeout, timeunit);
+ }
+
+ public boolean isCancelled() {
+ return _future.isCancelled();
+ }
+
+ public boolean isDone() {
+ return _future.isDone();
+ }
}
\ No newline at end of file
Modified: aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java
URL: http://svn.apache.org/viewvc/aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java?rev=1822826&r1=1822825&r2=1822826&view=diff
==============================================================================
--- aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java (original)
+++ aries/trunk/blueprint/blueprint-core/src/test/java/org/apache/aries/blueprint/AbstractBlueprintTest.java Wed Jan 31 20:10:03 2018
@@ -37,30 +37,37 @@ import org.xml.sax.SAXException;
public abstract class AbstractBlueprintTest extends TestCase {
protected ComponentDefinitionRegistryImpl parse(String name) throws Exception {
- NamespaceHandlerSet handlers = new NamespaceHandlerSet() {
+ NamespaceHandlerSet handlers = new NamespaceHandlerSet() {
public Set<URI> getNamespaces() {
return null;
}
+
public NamespaceHandler getNamespaceHandler(URI namespace) {
if (ExtNamespaceHandler.isExtNamespace(namespace.toString())) {
- return new ExtNamespaceHandler();
+ return new ExtNamespaceHandler();
} else {
- return null;
+ return null;
}
}
+
public void removeListener(NamespaceHandlerSet.Listener listener) {
}
+
public Schema getSchema() throws SAXException, IOException {
return null;
}
+
public Schema getSchema(Map<String, String> locations) throws SAXException, IOException {
- return null;
+ return null;
}
+
public boolean isComplete() {
return false;
}
+
public void addListener(NamespaceHandlerSet.Listener listener) {
}
+
public void destroy() {
}
};