You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2017/01/20 09:10:20 UTC
svn commit: r1779587 - in /river/jtsk/trunk/src:
net/jini/loader/LoadClass.java org/apache/river/concurrent/RC.java
org/apache/river/concurrent/ReferenceProcessor.java
Author: peter_firmstone
Date: Fri Jan 20 09:10:19 2017
New Revision: 1779587
URL: http://svn.apache.org/viewvc?rev=1779587&view=rev
Log:
RIVER-447 Leaked Executor Service Threads in LoadClass
Commit reviewed patch.
Modified:
river/jtsk/trunk/src/net/jini/loader/LoadClass.java
river/jtsk/trunk/src/org/apache/river/concurrent/RC.java
river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java
Modified: river/jtsk/trunk/src/net/jini/loader/LoadClass.java
URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/net/jini/loader/LoadClass.java?rev=1779587&r1=1779586&r2=1779587&view=diff
==============================================================================
--- river/jtsk/trunk/src/net/jini/loader/LoadClass.java (original)
+++ river/jtsk/trunk/src/net/jini/loader/LoadClass.java Fri Jan 20 09:10:19 2017
@@ -20,6 +20,8 @@ package net.jini.loader;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.List;
import org.apache.river.concurrent.RC;
import org.apache.river.concurrent.Ref;
import org.apache.river.concurrent.Referrer;
@@ -28,10 +30,12 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.river.thread.NamedThreadFactory;
/**
@@ -97,15 +101,7 @@ public class LoadClass {
}
ExecutorService exec = loaderMap.get(loader);
if (exec == null) {
- exec = new ThreadPoolExecutor(
- 1,
- 1,
- 0,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue(),
- new NamedThreadFactory(loader.toString(), false),
- new ThreadPoolExecutor.CallerRunsPolicy()
- );
+ exec = new AutoCloseableExecutor(loader.toString());
ExecutorService existed = loaderMap.putIfAbsent(loader, exec);
if (existed != null) {
exec = existed;
@@ -178,5 +174,92 @@ public class LoadClass {
}
}
+
+ private static class AutoCloseableExecutor implements ExecutorService, AutoCloseable {
+
+ private final ExecutorService decorated;
+
+ AutoCloseableExecutor(String loaderName){
+ decorated = new ThreadPoolExecutor(
+ 1,
+ 1,
+ 0,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue(),
+ new NamedThreadFactory(loaderName, false),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ @Override
+ public void shutdown() {
+ decorated.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return decorated.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return decorated.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return decorated.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return decorated.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return decorated.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return decorated.submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return decorated.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return decorated.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return decorated.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ return decorated.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return decorated.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ decorated.execute(command);
+ }
+
+ @Override
+ public void close() throws SecurityException {
+ decorated.shutdown();
+ }
+
+ }
}
Modified: river/jtsk/trunk/src/org/apache/river/concurrent/RC.java
URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/concurrent/RC.java?rev=1779587&r1=1779586&r2=1779587&view=diff
==============================================================================
--- river/jtsk/trunk/src/org/apache/river/concurrent/RC.java (original)
+++ river/jtsk/trunk/src/org/apache/river/concurrent/RC.java Fri Jan 20 09:10:19 2017
@@ -80,8 +80,9 @@ import java.util.Iterator;
* removal of enqued references is performed by background Executor threads.
* Your chosen encapsulated {@link Collection} must also be mutable.
* Objects will be removed automatically from encapsulated Collections when
- * they are eligible for garbage collection, external synchronisation of
- * decorated collections is not supported.
+ * they are eligible for garbage collection, object's that implement AutoCloseable
+ * will automatically have their resources freed after removal,
+ * external synchronisation of decorated collections is not supported.
* </p><p>
* If you're using Iterators, you must synchronise on the underlying Collection
* or Map, if iterating through keys or values, this doesn't apply to
Modified: river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java
URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java?rev=1779587&r1=1779586&r2=1779587&view=diff
==============================================================================
--- river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java (original)
+++ river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java Fri Jan 20 09:10:19 2017
@@ -15,6 +15,7 @@
package org.apache.river.concurrent;
+import java.io.IOException;
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
@@ -190,6 +191,15 @@ class ReferenceProcessor<T> implements R
try {
for ( Object t = queue.poll(); t != null; t = queue.poll()){
col.remove(t);
+ if (t instanceof Referrer){
+ Object referent = ((Referrer)t).get();
+ if (referent instanceof AutoCloseable){
+ try{
+ // Release any resources held by the referent.
+ ((AutoCloseable) referent).close();
+ } catch (Exception ex){} // Ignore
+ }
+ }
}
}catch(Exception e){
e.printStackTrace(System.err);