You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2017/01/20 09:10:20 UTC

svn commit: r1779587 - in /river/jtsk/trunk/src: net/jini/loader/LoadClass.java org/apache/river/concurrent/RC.java org/apache/river/concurrent/ReferenceProcessor.java

Author: peter_firmstone
Date: Fri Jan 20 09:10:19 2017
New Revision: 1779587

URL: http://svn.apache.org/viewvc?rev=1779587&view=rev
Log:
RIVER-447 Leaked Executor Service Threads in LoadClass

Commit reviewed patch.

Modified:
    river/jtsk/trunk/src/net/jini/loader/LoadClass.java
    river/jtsk/trunk/src/org/apache/river/concurrent/RC.java
    river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java

Modified: river/jtsk/trunk/src/net/jini/loader/LoadClass.java
URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/net/jini/loader/LoadClass.java?rev=1779587&r1=1779586&r2=1779587&view=diff
==============================================================================
--- river/jtsk/trunk/src/net/jini/loader/LoadClass.java (original)
+++ river/jtsk/trunk/src/net/jini/loader/LoadClass.java Fri Jan 20 09:10:19 2017
@@ -20,6 +20,8 @@ package net.jini.loader;
 
 import java.security.AccessController;
 import java.security.PrivilegedAction;
+import java.util.Collection;
+import java.util.List;
 import org.apache.river.concurrent.RC;
 import org.apache.river.concurrent.Ref;
 import org.apache.river.concurrent.Referrer;
@@ -28,10 +30,12 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.river.thread.NamedThreadFactory;
 
 /**
@@ -97,15 +101,7 @@ public class LoadClass {
         }
         ExecutorService exec = loaderMap.get(loader);
         if (exec == null) {
-            exec = new ThreadPoolExecutor(
-                    1,
-                    1,
-                    0,
-                    TimeUnit.SECONDS,
-                    new LinkedBlockingQueue(),
-                    new NamedThreadFactory(loader.toString(), false),
-                    new ThreadPoolExecutor.CallerRunsPolicy()
-            );
+            exec = new AutoCloseableExecutor(loader.toString());
             ExecutorService existed = loaderMap.putIfAbsent(loader, exec);
             if (existed != null) {
                 exec = existed;
@@ -178,5 +174,92 @@ public class LoadClass {
         }
 
     }
+    
+    private static class AutoCloseableExecutor implements ExecutorService, AutoCloseable {
+
+	private final ExecutorService decorated;
+	
+	AutoCloseableExecutor(String loaderName){
+	    decorated = new ThreadPoolExecutor(
+                    1,
+                    1,
+                    0,
+                    TimeUnit.SECONDS,
+                    new LinkedBlockingQueue(),
+                    new NamedThreadFactory(loaderName, false),
+                    new ThreadPoolExecutor.CallerRunsPolicy());
+	}
+	
+	@Override
+	public void shutdown() {
+	    decorated.shutdown();
+	}
+
+	@Override
+	public List<Runnable> shutdownNow() {
+	    return decorated.shutdownNow();
+	}
+
+	@Override
+	public boolean isShutdown() {
+	    return decorated.isShutdown();
+	}
+
+	@Override
+	public boolean isTerminated() {
+	    return decorated.isTerminated();
+	}
+
+	@Override
+	public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+	    return decorated.awaitTermination(timeout, unit);
+	}
+
+	@Override
+	public <T> Future<T> submit(Callable<T> task) {
+	    return decorated.submit(task);
+	}
+
+	@Override
+	public <T> Future<T> submit(Runnable task, T result) {
+	    return decorated.submit(task, result);
+	}
+
+	@Override
+	public Future<?> submit(Runnable task) {
+	    return decorated.submit(task);
+	}
+
+	@Override
+	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+	    return decorated.invokeAll(tasks);
+	}
+
+	@Override
+	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+	    return decorated.invokeAll(tasks, timeout, unit);
+	}
+
+	@Override
+	public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+	    return decorated.invokeAny(tasks);
+	}
+
+	@Override
+	public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+	    return decorated.invokeAny(tasks, timeout, unit);
+	}
+
+	@Override
+	public void execute(Runnable command) {
+	    decorated.execute(command);
+	}
+
+	@Override
+	public void close() throws SecurityException {
+	    decorated.shutdown();
+	}
+	
+    }
 
 }

Modified: river/jtsk/trunk/src/org/apache/river/concurrent/RC.java
URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/concurrent/RC.java?rev=1779587&r1=1779586&r2=1779587&view=diff
==============================================================================
--- river/jtsk/trunk/src/org/apache/river/concurrent/RC.java (original)
+++ river/jtsk/trunk/src/org/apache/river/concurrent/RC.java Fri Jan 20 09:10:19 2017
@@ -80,8 +80,9 @@ import java.util.Iterator;
  * removal of enqued references is performed by background Executor threads.
  * Your chosen encapsulated {@link Collection} must also be mutable.  
  * Objects will be removed automatically from encapsulated Collections when 
- * they are eligible for garbage collection, external synchronisation of 
- * decorated collections is not supported.
+ * they are eligible for garbage collection, object's that implement AutoCloseable
+ * will automatically have their resources freed after removal,
+ * external synchronisation of decorated collections is not supported.  
  * </p><p>
  * If you're using Iterators, you must synchronise on the underlying Collection
  * or Map, if iterating through keys or values, this doesn't apply to 

Modified: river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java
URL: http://svn.apache.org/viewvc/river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java?rev=1779587&r1=1779586&r2=1779587&view=diff
==============================================================================
--- river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java (original)
+++ river/jtsk/trunk/src/org/apache/river/concurrent/ReferenceProcessor.java Fri Jan 20 09:10:19 2017
@@ -15,6 +15,7 @@
 
 package org.apache.river.concurrent;
 
+import java.io.IOException;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
@@ -190,6 +191,15 @@ class ReferenceProcessor<T> implements R
             try {
                 for ( Object t = queue.poll(); t != null; t = queue.poll()){ 
                     col.remove(t);
+		    if (t instanceof Referrer){
+			Object referent = ((Referrer)t).get();
+			if (referent instanceof AutoCloseable){
+			   try{
+			       // Release any resources held by the referent.
+			       ((AutoCloseable) referent).close();
+			   } catch (Exception ex){} // Ignore
+			} 
+		    }
                 }
             }catch(Exception e){
                 e.printStackTrace(System.err);