You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/12/15 02:35:07 UTC

svn commit: r487420 - in /tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport: AbstractRxTask.java ReceiverBase.java RxTaskPool.java bio/BioReceiver.java bio/BioReplicationTask.java nio/NioReceiver.java nio/NioReplicationTask.java

Author: fhanik
Date: Thu Dec 14 17:35:06 2006
New Revision: 487420

URL: http://svn.apache.org/viewvc?view=rev&rev=487420
Log:
Implemented the use of an executor, tasks that aren't able to run quite yet, will be added to a pool of events, this pool of events is a zero GC (RxTaskPool) and is bounded.

Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java Thu Dec 14 17:35:06 2006
@@ -26,7 +26,7 @@
  * @author Filip Hanik
  * @version $Revision$ $Date$
  */
-public abstract class AbstractRxTask extends Thread 
+public abstract class AbstractRxTask implements Runnable
 { 
     
     public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER;
@@ -41,7 +41,7 @@
         this.callback = callback;
     }
 
-    public void setPool(RxTaskPool pool) {
+    public void setTaskPool(RxTaskPool pool) {
         this.pool = pool;
     }
 
@@ -57,7 +57,7 @@
         this.doRun = doRun;
     }
 
-    public RxTaskPool getPool() {
+    public RxTaskPool getTaskPool() {
         return pool;
     }
 

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Thu Dec 14 17:35:06 2006
@@ -19,6 +19,11 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelReceiver;
@@ -58,8 +63,10 @@
     private long tcpSelectorTimeout = 5000;
     //how many times to search for an available socket
     private int autoBind = 100;
-    private int maxThreads = 6;
+    private int maxThreads = 15;
     private int minThreads = 6;
+    private int maxTasks = 100;
+    private int minTasks = 10;
     private boolean tcpNoDelay = true;
     private boolean soKeepAlive = false;
     private boolean ooBInline = true;
@@ -69,11 +76,23 @@
     private int soTrafficClass = 0x04 | 0x08 | 0x010;
     private int timeout = 3000; //3 seconds
     private boolean useBufferPool = true;
+    
+    private Executor executor;
 
 
     public ReceiverBase() {
     }
     
+    public void start() throws IOException {
+        if ( executor == null ) {
+            executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
+        }
+    }
+    
+    public void stop() {
+        if ( executor instanceof ExecutorService ) ((ExecutorService)executor).shutdown();
+    }
+    
     /**
      * getMessageListener
      *
@@ -270,7 +289,7 @@
         return listener;
     }
 
-    public RxTaskPool getPool() {
+    public RxTaskPool getTaskPool() {
         return pool;
     }
     
@@ -335,6 +354,22 @@
         return securePort;
     }
 
+    public int getMinTasks() {
+        return minTasks;
+    }
+
+    public int getMaxTasks() {
+        return maxTasks;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    public boolean isListening() {
+        return listen;
+    }
+
     /**
      * @deprecated use setSelectorTimeout
      * @param selTimeout long
@@ -429,7 +464,20 @@
         this.securePort = securePort;
     }
 
+    public void setMinTasks(int minTasks) {
+        this.minTasks = minTasks;
+    }
+
+    public void setMaxTasks(int maxTasks) {
+        this.maxTasks = maxTasks;
+    }
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
     public void heartbeat() {
         //empty operation
     }
+    
 }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java Thu Dec 14 17:35:06 2006
@@ -19,6 +19,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ThreadFactory;
 
 /**
  * @author not attributable
@@ -40,8 +41,8 @@
     boolean running = true;
     
     private static int counter = 1;
-    private int maxThreads;
-    private int minThreads;
+    private int maxTasks;
+    private int minTasks;
     
     private TaskCreator creator = null;
 
@@ -50,34 +51,27 @@
     }
 
     
-    public RxTaskPool (int maxThreads, int minThreads, TaskCreator creator) throws Exception {
+    public RxTaskPool (int maxTasks, int minTasks, TaskCreator creator) throws Exception {
         // fill up the pool with worker threads
-        this.maxThreads = maxThreads;
-        this.minThreads = minThreads;
+        this.maxTasks = maxTasks;
+        this.minTasks = minTasks;
         this.creator = creator;
-        //for (int i = 0; i < minThreads; i++) {
-        for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem
-            AbstractRxTask thread = creator.getWorkerThread();
-            setupThread(thread);
-            idle.add (thread);
-        }
     }
     
-    protected void setupThread(AbstractRxTask thread) {
-        synchronized (thread) {
-            thread.setPool(this);
-            thread.setName(thread.getClass().getName() + "[" + inc() + "]");
-            thread.setDaemon(true);
-            thread.setPriority(Thread.MAX_PRIORITY);
-            thread.start();
-            try {thread.wait(500); }catch ( InterruptedException x ) {}
+    protected void configureTask(AbstractRxTask task) {
+        synchronized (task) {
+            task.setTaskPool(this);
+//            task.setName(task.getClass().getName() + "[" + inc() + "]");
+//            task.setDaemon(true);
+//            task.setPriority(Thread.MAX_PRIORITY);
+//            task.start();
         }
     }
 
     /**
      * Find an idle worker thread, if any.  Could return null.
      */
-    public AbstractRxTask getWorker()
+    public AbstractRxTask getRxTask()
     {
         AbstractRxTask worker = null;
         synchronized (mutex) {
@@ -89,9 +83,9 @@
                         //this means that there are no available workers
                         worker = null;
                     }
-                } else if ( used.size() < this.maxThreads && creator != null) {
-                    worker = creator.getWorkerThread();
-                    setupThread(worker);
+                } else if ( used.size() < this.maxTasks && creator != null) {
+                    worker = creator.createRxTask();
+                    configureTask(worker);
                 } else {
                     try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
                 }
@@ -114,7 +108,7 @@
             synchronized (mutex) {
                 used.remove(worker);
                 //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker);
-                if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
+                if ( idle.size() < maxTasks && !idle.contains(worker)) idle.add(worker); //let max be the upper limit
                 else {
                     worker.setDoRun(false);
                     synchronized (worker){worker.notify();}
@@ -128,11 +122,11 @@
     }
 
     public int getMaxThreads() {
-        return maxThreads;
+        return maxTasks;
     }
 
     public int getMinThreads() {
-        return minThreads;
+        return minTasks;
     }
 
     public void stop() {
@@ -147,19 +141,19 @@
         }
     }
 
-    public void setMaxThreads(int maxThreads) {
-        this.maxThreads = maxThreads;
+    public void setMaxTasks(int maxThreads) {
+        this.maxTasks = maxThreads;
     }
 
-    public void setMinThreads(int minThreads) {
-        this.minThreads = minThreads;
+    public void setMinTasks(int minThreads) {
+        this.minTasks = minThreads;
     }
 
-    public TaskCreator getThreadCreator() {
+    public TaskCreator getTaskCreator() {
         return this.creator;
     }
     
-    public static interface TaskCreator {
-        public AbstractRxTask getWorkerThread();
+    public static interface TaskCreator  {
+        public AbstractRxTask createRxTask();
     }
 }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java Thu Dec 14 17:35:06 2006
@@ -53,6 +53,7 @@
      * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
      */
     public void start() throws IOException {
+        super.start();
         try {
             setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
         } catch (Exception x) {
@@ -73,7 +74,7 @@
         }
     }
     
-    public AbstractRxTask getWorkerThread() {
+    public AbstractRxTask createRxTask() {
         return getReplicationThread();
     }
     
@@ -93,6 +94,7 @@
         try {
             this.serverSocket.close();
         }catch ( Exception x ) {}
+        super.stop();
     }
 
     
@@ -125,20 +127,21 @@
 
         while ( doListen() ) {
             Socket socket = null;
-            if ( getPool().available() < 1 ) {
+            if ( getTaskPool().available() < 1 ) {
                 if ( log.isWarnEnabled() )
                     log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up.");
             }
-            BioReplicationTask thread = (BioReplicationTask)getPool().getWorker();
-            if ( thread == null ) continue; //should never happen
+            BioReplicationTask task = (BioReplicationTask)getTaskPool().getRxTask();
+            if ( task == null ) continue; //should never happen
             try {
                 socket = serverSocket.accept();
             }catch ( Exception x ) {
                 if ( doListen() ) throw x;
             }
             if ( !doListen() ) {
-                thread.setDoRun(false);
-                thread.serviceSocket(null,null);
+                task.setDoRun(false);
+                task.serviceSocket(null,null);
+                getExecutor().execute(task);
                 break; //regular shutdown
             }
             if ( socket == null ) continue;
@@ -152,7 +155,7 @@
             socket.setTrafficClass(getSoTrafficClass());
             socket.setSoTimeout(getTimeout());
             ObjectReader reader = new ObjectReader(socket);
-            thread.serviceSocket(socket,reader);
+            task.serviceSocket(socket,reader);
         }//while
     }
     

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java Thu Dec 14 17:35:06 2006
@@ -59,32 +59,19 @@
     // loop forever waiting for work to do
     public synchronized void run()
     {
-        this.notify();
-        while (isDoRun()) {
-            try {
-                // sleep and release object lock
-                this.wait();
-            } catch (InterruptedException e) {
-                if(log.isInfoEnabled())
-                    log.info("TCP worker thread interrupted in cluster",e);
-                // clear interrupt status
-                Thread.interrupted();
-            }
-            if ( socket == null ) continue;
-            try {
-                drainSocket();
-            } catch ( Exception x ) {
-                log.error("Unable to service bio socket");
-            }finally {
-                try {socket.close();}catch ( Exception ignore){}
-                try {reader.close();}catch ( Exception ignore){}
-                reader = null;
-                socket = null;
-            }
-            // done, ready for more, return to pool
-            if ( getPool() != null ) getPool().returnWorker (this);
-            else setDoRun(false);
+        if ( socket == null ) return;
+        try {
+            drainSocket();
+        } catch ( Exception x ) {
+            log.error("Unable to service bio socket");
+        }finally {
+            try {socket.close();}catch ( Exception ignore){}
+            try {reader.close();}catch ( Exception ignore){}
+            reader = null;
+            socket = null;
         }
+        // done, ready for more, return to pool
+        if ( getTaskPool() != null ) getTaskPool().returnWorker (this);
     }
 
     

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java Thu Dec 14 17:35:06 2006
@@ -80,6 +80,7 @@
 
     public void stop() {
         this.stopListening();
+        super.stop();
     }
 
     /**
@@ -88,8 +89,8 @@
      * @see org.apache.catalina.tribes.ClusterReceiver#start()
      */
     public void start() throws IOException {
+        super.start();
         try {
-//            setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
             setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this));
         } catch (Exception x) {
             log.fatal("ThreadPool can initilzed. Listener not started", x);
@@ -109,7 +110,7 @@
         }
     }
     
-    public AbstractRxTask getWorkerThread() {
+    public AbstractRxTask createRxTask() {
         NioReplicationTask thread = new NioReplicationTask(this,this);
         thread.setUseBufferPool(this.getUseBufferPool());
         thread.setRxBufSize(getRxBufSize());
@@ -142,7 +143,7 @@
                 events.add(event);
             }
             if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event);
-            selector.wakeup();
+            if ( isListening() && selector!=null ) selector.wakeup();
         }
     }
 
@@ -177,7 +178,9 @@
     
     protected void socketTimeouts() {
         //timeout
-        Set keys = selector.keys();
+        Selector tmpsel = selector;
+        Set keys =  (isListening()&&tmpsel!=null)?tmpsel.keys():null;
+        if ( keys == null ) return;
         long now = System.currentTimeMillis();
         for (Iterator iter = keys.iterator(); iter.hasNext(); ) {
             SelectionKey key = (SelectionKey) iter.next();
@@ -365,17 +368,18 @@
      *  will then de-register the channel on the next select call.
      */
     protected void readDataFromSocket(SelectionKey key) throws Exception {
-        NioReplicationTask worker = (NioReplicationTask) getPool().getWorker();
-        if (worker == null) {
-            // No threads available, do nothing, the selection
+        NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
+        if (task == null) {
+            // No threads/tasks available, do nothing, the selection
             // loop will keep calling this method until a
             // thread becomes available, the thread pool itself has a waiting mechanism
             // so we will not wait here.
-            if (log.isDebugEnabled())
-                log.debug("No TcpReplicationThread available");
+            if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
         } else {
             // invoking this wakes up the worker thread then returns
-            worker.serviceChannel(key);
+            //add task to thread pool
+            task.serviceChannel(key);
+            getExecutor().execute(task);
         }
     }
 

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?view=diff&rev=487420&r1=487419&r2=487420
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java Thu Dec 14 17:35:06 2006
@@ -63,60 +63,50 @@
 
     // loop forever waiting for work to do
     public synchronized void run() { 
-        this.notify();
         if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) {
             buffer = ByteBuffer.allocateDirect(getRxBufSize());
         }else {
             buffer = ByteBuffer.allocate (getRxBufSize());
         }
-        while (isDoRun()) {
-            try {
-                // sleep and release object lock
-                this.wait();
-            } catch (InterruptedException e) {
-                if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e);
-                // clear interrupt status
-                Thread.interrupted();
-            }
-            if (key == null) {
-                continue;	// just in case
-            }
-            if ( log.isTraceEnabled() ) 
-                log.trace("Servicing key:"+key);
 
-            try {
-                ObjectReader reader = (ObjectReader)key.attachment();
-                if ( reader == null ) {
-                    if ( log.isTraceEnabled() ) 
-                        log.trace("No object reader, cancelling:"+key);
-                    cancelKey(key);
-                } else {
-                    if ( log.isTraceEnabled() ) 
-                        log.trace("Draining channel:"+key);
+        if (key == null) {
+            return;	// just in case
+        }
+        if ( log.isTraceEnabled() ) 
+            log.trace("Servicing key:"+key);
 
-                    drainChannel(key, reader);
-                }
-            } catch (Exception e) {
-                //this is common, since the sockets on the other
-                //end expire after a certain time.
-                if ( e instanceof CancelledKeyException ) {
-                    //do nothing
-                } else if ( e instanceof IOException ) {
-                    //dont spew out stack traces for IO exceptions unless debug is enabled.
-                    if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
-                    else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
-                } else if ( log.isErrorEnabled() ) {
-                    //this is a real error, log it.
-                    log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
-                } 
+        try {
+            ObjectReader reader = (ObjectReader)key.attachment();
+            if ( reader == null ) {
+                if ( log.isTraceEnabled() ) 
+                    log.trace("No object reader, cancelling:"+key);
                 cancelKey(key);
-            } finally {
-                
+            } else {
+                if ( log.isTraceEnabled() ) 
+                    log.trace("Draining channel:"+key);
+
+                drainChannel(key, reader);
             }
-            key = null;
-            // done, ready for more, return to pool
-            getPool().returnWorker (this);
+        } catch (Exception e) {
+            //this is common, since the sockets on the other
+            //end expire after a certain time.
+            if ( e instanceof CancelledKeyException ) {
+                //do nothing
+            } else if ( e instanceof IOException ) {
+                //dont spew out stack traces for IO exceptions unless debug is enabled.
+                if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e);
+                else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].");
+            } else if ( log.isErrorEnabled() ) {
+                //this is a real error, log it.
+                log.error("Exception caught in TcpReplicationThread.drainChannel.",e);
+            } 
+            cancelKey(key);
+        } finally {
+
         }
+        key = null;
+        // done, ready for more, return to pool
+        getTaskPool().returnWorker (this);
     }
 
     /**
@@ -131,14 +121,12 @@
      * worker thread is servicing it.
      */
     public synchronized void serviceChannel (SelectionKey key) {
-        if ( log.isTraceEnabled() ) 
-            log.trace("About to service key:"+key);
+        if ( log.isTraceEnabled() ) log.trace("About to service key:"+key);
         ObjectReader reader = (ObjectReader)key.attachment();
         if ( reader != null ) reader.setLastAccess(System.currentTimeMillis());
         this.key = key;
         key.interestOps (key.interestOps() & (~SelectionKey.OP_READ));
         key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE));
-        this.notify();		// awaken the thread
     }
 
     /**



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org