You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2009/09/14 22:06:03 UTC

svn commit: r814823 - in /tomcat/trunk: java/org/apache/juli/AsyncFileHandler.java webapps/docs/config/systemprops.xml

Author: fhanik
Date: Mon Sep 14 20:06:03 2009
New Revision: 814823

URL: http://svn.apache.org/viewvc?rev=814823&view=rev
Log:
Refactoring of async logging handler. Much cleaner code achieves the exact same functionality.
The fact that the queue is bounded means that we never have to worry about the size


Modified:
    tomcat/trunk/java/org/apache/juli/AsyncFileHandler.java
    tomcat/trunk/webapps/docs/config/systemprops.xml

Modified: tomcat/trunk/java/org/apache/juli/AsyncFileHandler.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/juli/AsyncFileHandler.java?rev=814823&r1=814822&r2=814823&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/juli/AsyncFileHandler.java (original)
+++ tomcat/trunk/java/org/apache/juli/AsyncFileHandler.java Mon Sep 14 20:06:03 2009
@@ -16,10 +16,8 @@
  */
 package org.apache.juli;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.LogRecord;
 /**
  * 
@@ -31,20 +29,20 @@
     public static final int OVERFLOW_DROP_LAST = 1;
     public static final int OVERFLOW_DROP_FIRST = 2;
     public static final int OVERFLOW_DROP_FLUSH = 3;
+    public static final int OVERFLOW_DROP_CURRENT = 4;
     
     public static final int OVERFLOW_DROP_TYPE = Integer.parseInt(System.getProperty("org.apache.juli.AsyncOverflowDropType","1"));
-    public static final int DEFAULT_MAX_RECORDS = Integer.parseInt(System.getProperty("org.apache.juli.AsyncMaxRecordCount","1000"));
-    public static final int RECORD_BATCH_COUNT = Integer.parseInt(System.getProperty("org.apache.juli.AsyncRecordBatchCount","100"));
+    public static final int DEFAULT_MAX_RECORDS = Integer.parseInt(System.getProperty("org.apache.juli.AsyncMaxRecordCount","10000"));
+    public static final int LOGGER_SLEEP_TIME = Integer.parseInt(System.getProperty("org.apache.juli.AsyncLoggerPollInterval","1000"));
+   
+    protected static LinkedBlockingDeque<LogEntry> queue = new LinkedBlockingDeque<LogEntry>(DEFAULT_MAX_RECORDS);
     
-    protected static ConcurrentLinkedQueue<FileHandler> handlers = new ConcurrentLinkedQueue<FileHandler>();
-    protected static SignalAtomicLong recordCounter = new SignalAtomicLong();
     protected static LoggerThread logger = new LoggerThread();
     
     static {
         logger.start();
     }
     
-    protected LogQueue<LogRecord> queue = new LogQueue<LogRecord>();
     protected volatile boolean closed = false;
     
     public AsyncFileHandler() {
@@ -62,9 +60,6 @@
         closed = true;
         // TODO Auto-generated method stub
         super.close();
-        handlers.remove(this);
-        //empty the queue of log entries for this log
-        while (queue.poll()!=null) recordCounter.addAndGet(-1);
     }
     
     @Override
@@ -73,7 +68,6 @@
         closed = false;
         // TODO Auto-generated method stub
         super.open();
-        handlers.add(this);
     }
     
 
@@ -82,91 +76,43 @@
         if (!isLoggable(record)) {
             return;
         }
-        this.queue.offer(record);
+        LogEntry entry = new LogEntry(record,this);
+        boolean added = false;
+        try {
+            while (!added && !queue.offer(entry)) {
+                switch (OVERFLOW_DROP_TYPE) {
+                    case OVERFLOW_DROP_LAST: {
+                        //remove the last added element
+                        queue.pollLast(); 
+                        break;
+                    }
+                    case OVERFLOW_DROP_FIRST: {
+                        //remove the first element in the queue
+                        queue.pollFirst();
+                        break;
+                    }
+                    case OVERFLOW_DROP_FLUSH: {
+                        added = queue.offer(entry,1000,TimeUnit.MILLISECONDS);
+                        break;
+                    }
+                    case OVERFLOW_DROP_CURRENT: {
+                        added = true;
+                        break;
+                    }
+                }//switch
+            }//while
+        }catch (InterruptedException x) {
+            //allow thread to be interrupted and back out of the publish operation
+            //after this we clear the flag
+            Thread.interrupted();
+        }
+        
     }
     
     protected void publishInternal(LogRecord record) {
-        recordCounter.addAndGet(-1);
         super.publish(record);
     }
 
-    @Override
-    protected void finalize() throws Throwable {
-        // TODO Auto-generated method stub
-        super.finalize();
-    }
-
-    public int getMaxRecords() {
-        return this.queue.max;
-    }
-
-    public void setMaxRecords(int maxRecords) {
-        this.queue.max = maxRecords;
-    }
-
-    public int getOverflowAction() {
-        return this.queue.type;
-    }
-
-    public void setOverflowAction(int type) {
-        this.queue.type = type;
-    }
-    
-    protected static class SignalAtomicLong {
-        AtomicLong delegate = new AtomicLong(0);
-        ReentrantLock lock = new ReentrantLock();
-        Condition sleepUntilPositiveCond = lock.newCondition();
-        Condition sleepUntilEmpty = lock.newCondition();
-        
-        public long addAndGet(long i) {
-            long prevValue = delegate.getAndAdd(i);
-            if (prevValue<=0 && i>0) {
-                lock.lock();
-                try {
-                    sleepUntilPositiveCond.signalAll();
-                } finally {
-                    lock.unlock();
-                }
-            } else if (prevValue>0 && delegate.get()<=0) {
-                lock.lock();
-                try {
-                    sleepUntilEmpty.signalAll();
-                } finally {
-                    lock.unlock();
-                }
-            }
-            return delegate.get();
-        }
-        
-        public void sleepUntilPositive() throws InterruptedException {
-            if (delegate.get()>0) return;
-            lock.lock();
-            try {
-                if (delegate.get()>0) return;
-                sleepUntilPositiveCond.await();
-            } finally {
-                lock.unlock();
-            }
-        }
-        
-        public void sleepUntilEmpty() throws InterruptedException {
-            if (delegate.get()<=0) return;
-            lock.lock();
-            try {
-                if (delegate.get()<=0) return;
-                sleepUntilPositiveCond.await();
-            } finally {
-                lock.unlock();
-            }
-            
-        }
-        
-        public long get() {
-            return delegate.get();
-        }
-        
-    }
-    
     protected static class LoggerThread extends Thread {
         protected boolean run = true;
         public LoggerThread() {
@@ -177,81 +123,36 @@
         public void run() {
             while (run) {
                 try {
-                    AsyncFileHandler.recordCounter.sleepUntilPositive();
-                } catch (InterruptedException x) {
+                    LogEntry entry = queue.poll(LOGGER_SLEEP_TIME, TimeUnit.MILLISECONDS);
+                    entry.flush();
+                }catch (InterruptedException x) {
                     Thread.interrupted();
-                    continue;
+                }catch (Exception x) {
+                    x.printStackTrace();
                 }
-                AsyncFileHandler[] handlers = AsyncFileHandler.handlers.toArray(new AsyncFileHandler[0]);
-                for (int i=0; run && i<handlers.length; i++) {
-                    int counter = 0;
-                    while (run && (counter++)<RECORD_BATCH_COUNT) {
-                        if (handlers[i].closed) break;
-                        LogRecord record = handlers[i].queue.poll();
-                        if (record==null) break;
-                        handlers[i].publishInternal(record);
-                    }//while
-                }//for
             }//while
         }
     }
     
-    protected static class LogQueue<E> {
-        protected int max = DEFAULT_MAX_RECORDS;
-        protected int type = OVERFLOW_DROP_TYPE;
-        protected ConcurrentLinkedQueue<E> delegate = new ConcurrentLinkedQueue<E>(); 
+    protected static class LogEntry {
+        private LogRecord record;
+        private AsyncFileHandler handler;
+        public LogEntry(LogRecord record, AsyncFileHandler handler) {
+            super();
+            this.record = record;
+            this.handler = handler;
+        }
         
-        public boolean offer(E e) {
-            if (delegate.size()>=max) {
-                switch (type) {
-                    case OVERFLOW_DROP_LAST:
-                        return false;
-                    case OVERFLOW_DROP_FIRST: {
-                        this.poll();
-                        if (delegate.offer(e)) {
-                            recordCounter.addAndGet(1);
-                            return true;
-                        } else {
-                            return false;
-                        }
-                    }
-                    case OVERFLOW_DROP_FLUSH: {
-                        try {
-                            recordCounter.sleepUntilEmpty();
-                        }catch (InterruptedException x) {
-                            //no op - simply continue the operation
-                        }
-                        if (delegate.offer(e)) {
-                            recordCounter.addAndGet(1);
-                            return true;
-                        } else {
-                            return false;
-                        }
-                    }
-                    default:
-                        return false;
-                }
+        public boolean flush() {
+            if (handler.closed) {
+                return false;
             } else {
-                if (delegate.offer(e)) {
-                    recordCounter.addAndGet(1);
-                    return true;
-                } else {
-                    return false;
-                }
-
+                handler.publishInternal(record);
+                return true;
             }
         }
-
-        public E peek() {
-            return delegate.peek();
-        }
-
-        public E poll() {
-            // TODO Auto-generated method stub
-            return delegate.poll();
-        }
         
     }
-
+    
     
 }

Modified: tomcat/trunk/webapps/docs/config/systemprops.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/config/systemprops.xml?rev=814823&r1=814822&r2=814823&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/config/systemprops.xml (original)
+++ tomcat/trunk/webapps/docs/config/systemprops.xml Mon Sep 14 20:06:03 2009
@@ -336,6 +336,7 @@
            <li><code>int OVERFLOW_DROP_LAST = 1</code> - the record that caused the overflow will be dropped and not logged</li>
            <li><code>int OVERFLOW_DROP_FIRST = 2</code> - the record that is next in line to be logged will be dropped to make room for the latest record on the queue</li>
            <li><code>int OVERFLOW_DROP_FLUSH = 3</code> - suspend the thread while the queue empties out and flushes the entries to the write buffer</li>
+           <li><code>int OVERFLOW_DROP_CURRENT = 4</code> - drop the current log entry</li>
          </ul>
          Default value is <code>1</code> (OVERFLOW_DROP_LAST).
       </p>
@@ -344,12 +345,16 @@
     <property name="org.apache.juli.AsyncMaxRecordCount">
       <p>The max number of log records that the async logger will keep in memory. When this limit is reached and a new record is being logged by the 
          JULI framework the system will take an action based on the <code>org.apache.juli.AsyncOverflowDropType</code> setting.
-         The default value is <code>1000</code> records
+         The default value is <code>10000</code> records.<br/>
+         This number represents the global number of records, not on a per handler basis.
       </p>
     </property>
 
-    <property name="org.apache.juli.AsyncRecordBatchCount">
-      <p>
+    <property name="org.apache.juli.AsyncLoggerPollInterval">
+      <p>The poll interval in milliseconds for the asynchronous logger thread in milliseconds.
+         If the log queue is empty, the async thread will issue a poll(poll interval)
+         in order to not wake up to often.
+         The default value is <code>1000</code> milliseconds.
       </p>
     </property>
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org