You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jetspeed-dev@portals.apache.org by ra...@apache.org on 2003/11/30 16:30:33 UTC

cvs commit: jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl HttpBufferedResponse.java ContentDispatcherImpl.java PortletRendererImpl.java Worker.java WorkerMonitor.java QueueMonitor.java

raphael     2003/11/30 07:30:33

  Modified:    portal/src/java/org/apache/jetspeed/aggregator Tag:
                        aggregation_1-branch ContentDispatcher.java
               portal/src/java/org/apache/jetspeed/aggregator/impl Tag:
                        aggregation_1-branch ContentDispatcherImpl.java
                        PortletRendererImpl.java Worker.java
                        WorkerMonitor.java
  Added:       portal/src/java/org/apache/jetspeed/aggregator/impl Tag:
                        aggregation_1-branch HttpBufferedResponse.java
  Removed:     portal/src/java/org/apache/jetspeed/aggregator/impl Tag:
                        aggregation_1-branch QueueMonitor.java
  Log:
  - Remove polling QueueMonitor in favor to job limits per worker
  - Flesh out content dispatcher
  - Make sure code compiles cleanly
  
  Revision  Changes    Path
  No                   revision
  No                   revision
  1.1.2.2   +13 -4     jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/Attic/ContentDispatcher.java
  
  Index: ContentDispatcher.java
  ===================================================================
  RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/Attic/ContentDispatcher.java,v
  retrieving revision 1.1.2.1
  retrieving revision 1.1.2.2
  diff -u -r1.1.2.1 -r1.1.2.2
  --- ContentDispatcher.java	29 Nov 2003 23:00:01 -0000	1.1.2.1
  +++ ContentDispatcher.java	30 Nov 2003 15:30:32 -0000	1.1.2.2
  @@ -53,9 +53,14 @@
    */
   package org.apache.jetspeed.aggregator;
   
  +import javax.servlet.ServletOutputStream;
  +import javax.servlet.http.HttpServletRequest;
  +import javax.servlet.http.HttpServletResponse;
  +
   import org.apache.jetspeed.om.page.Fragment;
  -import javax.portlet.PortletRequest;
  -import javax.portlet.PortletResponse;
  +import org.apache.jetspeed.request.RequestContext;
  +import org.apache.pluto.om.common.ObjectID;
  +import org.apache.pluto.om.window.PortletWindow;
   
   /**
    * <p>The ContentDispatcher allows customer classes to retrieved
  @@ -72,5 +77,9 @@
        * If the fragment rendered content is not yet available, the method will
        * hold until it's completely rendered.
        */
  -    public void include(Fragment fragment, PortletRequest req, PortletResponse rsp);
  +    public void include(Fragment fragment, HttpServletRequest req, HttpServletResponse rsp);
  +
  +    public void notify(ObjectID oid);
  +
  +    public HttpServletResponse register(PortletWindow window, RequestContext request);
   }
  
  
  
  No                   revision
  No                   revision
  1.1.2.2   +128 -4    jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/ContentDispatcherImpl.java
  
  Index: ContentDispatcherImpl.java
  ===================================================================
  RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/ContentDispatcherImpl.java,v
  retrieving revision 1.1.2.1
  retrieving revision 1.1.2.2
  diff -u -r1.1.2.1 -r1.1.2.2
  --- ContentDispatcherImpl.java	29 Nov 2003 23:00:01 -0000	1.1.2.1
  +++ ContentDispatcherImpl.java	30 Nov 2003 15:30:33 -0000	1.1.2.2
  @@ -53,10 +53,22 @@
    */
   package org.apache.jetspeed.aggregator.impl;
   
  +import java.util.Map;
  +import java.util.Hashtable;
  +
  +import javax.servlet.ServletOutputStream;
  +import javax.servlet.http.HttpServletRequest;
  +import javax.servlet.http.HttpServletResponse;
  +
  +import org.apache.commons.logging.Log;
  +import org.apache.commons.logging.LogFactory;
   import org.apache.jetspeed.aggregator.ContentDispatcher;
   import org.apache.jetspeed.om.page.Fragment;
  -import javax.portlet.PortletRequest;
  -import javax.portlet.PortletResponse;
  +import org.apache.jetspeed.request.RequestContext;
  +import org.apache.jetspeed.util.JetspeedObjectID;
  +import org.apache.jetspeed.util.ByteArrayServletOutputStream;
  +import org.apache.pluto.om.common.ObjectID;
  +import org.apache.pluto.om.window.PortletWindow;
   
   /**
    * <p>The ContentDispatcher allows customer classes to retrieved
  @@ -67,13 +79,125 @@
    */
   public class ContentDispatcherImpl implements ContentDispatcher
   {
  +    /** Commons logging */
  +    protected final static Log log = LogFactory.getLog(ContentDispatcherImpl.class);
  +
  +    private Map contents = new Hashtable();
  +
       /**
        * Include in the provided PortletResponse output stream the rendered content
        * of the request fragment.
        * If the fragment rendered content is not yet available, the method will
        * hold until it's completely rendered.
        */
  -    public void include(Fragment fragment, PortletRequest req, PortletResponse rsp)
  +    public void include(Fragment fragment, HttpServletRequest req, HttpServletResponse rsp)
  +    {
  +        ObjectID oid = JetspeedObjectID.createFromString(fragment.getId());
  +
  +        PortletContent content = (PortletContent)contents.get(oid);
  +
  +        if (content!=null)
  +        {
  +            synchronized (content)
  +            {
  +                if (!content.isComplete())
  +                {
  +                    log.debug("Waiting for content of fragment "+oid);
  +                    try
  +                    {
  +                        content.wait();
  +                    }
  +                    catch (InterruptedException e)
  +                    {
  +                    }
  +                    log.debug("Been notified that content "+oid+" is complete");
  +                }
  +            }
  +
  +            try
  +            {
  +                try
  +                {
  +                    rsp.getWriter().write(new String(content.toByteArray(),
  +                                                     rsp.getCharacterEncoding())
  +                                         );
  +
  +                }
  +                catch (IllegalStateException e)
  +                {
  +                    rsp.getOutputStream().write(content.toByteArray());
  +                }
  +            }
  +            catch (Exception e)
  +            {
  +                log.error("Unable to include content "+oid+" in response object", e);
  +            }
  +            finally
  +            {
  +                synchronized(contents)
  +                {
  +                    log.debug("Removing content "+oid);
  +                    contents.remove(oid);
  +                }
  +            }
  +        }
  +    }
  +
  +    public void notify(ObjectID oid)
  +    {
  +        PortletContent content = (PortletContent)contents.get(oid);
  +
  +        if (content!=null)
  +        {
  +            synchronized (content)
  +            {
  +                content.setComplete(true);
  +                content.notifyAll();
  +            }
  +        }
  +    }
  +
  +    public HttpServletResponse register(PortletWindow window, RequestContext request)
  +    {
  +        PortletContent myContent = new PortletContent();
  +
  +        synchronized (contents)
  +        {
  +            contents.put(window.getId(),myContent);
  +        }
  +
  +        return new HttpBufferedResponse(request.getResponseForWindow(window),
  +                                        myContent.getOutputStream());
  +    }
  +
  +    protected class PortletContent
       {
  +        private ByteArrayServletOutputStream os;
  +        private boolean complete = false;
  +
  +        PortletContent()
  +        {
  +            os = new ByteArrayServletOutputStream();
  +        }
  +
  +        public ServletOutputStream getOutputStream()
  +        {
  +            return os;
  +        }
  +
  +        public byte[] toByteArray()
  +        {
  +            return os.toByteArray();
  +        }
  +
  +        public boolean isComplete()
  +        {
  +            return complete;
  +        }
  +
  +        void setComplete(boolean state)
  +        {
  +            this.complete = state;
  +        }
       }
   }
  
  
  
  1.1.2.2   +4 -7      jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/PortletRendererImpl.java
  
  Index: PortletRendererImpl.java
  ===================================================================
  RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/PortletRendererImpl.java,v
  retrieving revision 1.1.2.1
  retrieving revision 1.1.2.2
  diff -u -r1.1.2.1 -r1.1.2.2
  --- PortletRendererImpl.java	29 Nov 2003 23:00:01 -0000	1.1.2.1
  +++ PortletRendererImpl.java	30 Nov 2003 15:30:33 -0000	1.1.2.2
  @@ -157,15 +157,12 @@
           ObjectID oid = JetspeedObjectID.createFromString(fragment.getId());
           PortletEntity portletEntity = PortletEntityAccess.getEntity(oid);
           PortletWindow portletWindow = PortletWindowFactory.getWindow(portletEntity, oid);
  -
  -        HttpServletRequest servletRequest = request.getRequestForWindow(portletWindow);
  -        HttpServletResponse servletResponse = request.getResponseForWindow(portletWindow);
  -
           ContentDispatcher dispatcher = getDispatcher(request);
   
  +        HttpServletResponse servletResponse = dispatcher.register(portletWindow, request);
  +
           rJob.setWindow(portletWindow);
  -        // We should actually wrap these objects with thread-safe versions
  -        rJob.setRequest(servletRequest);
  +        rJob.setRequest(request.getRequestForWindow(portletWindow));
           rJob.setResponse(servletResponse);
           rJob.setDispatcher(dispatcher);
   
  
  
  
  1.1.2.2   +24 -2     jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/Worker.java
  
  Index: Worker.java
  ===================================================================
  RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/Worker.java,v
  retrieving revision 1.1.2.1
  retrieving revision 1.1.2.2
  diff -u -r1.1.2.1 -r1.1.2.2
  --- Worker.java	29 Nov 2003 23:00:01 -0000	1.1.2.1
  +++ Worker.java	30 Nov 2003 15:30:33 -0000	1.1.2.2
  @@ -73,6 +73,10 @@
       /** Running status of this worker */
       private boolean running = true;
   
  +    /** Counter of consecutive jobs that can be processed before the
  +        worker being actually put back on the idle queue */
  +    private int jobCount = 0;
  +
       /** Job to process */
       private Runnable job = null;
   
  @@ -94,6 +98,23 @@
       }
   
       /**
  +     * Return the number of jobs processed by this worker since the last time it
  +     * has been on the idle queue
  +     */
  +    public int getJobCount()
  +    {
  +        return this.jobCount;
  +    }
  +
  +    /**
  +     * Reset the processed job counter
  +     */
  +    public void resetJobCount()
  +    {
  +        this.jobCount=0;
  +    }
  +
  +    /**
        * Sets the running status of this Worker. If set to false, the Worker will
        * stop after processing its current job.
        */
  @@ -102,7 +123,6 @@
           this.running = status;
       }
   
  -
       /**
        * Sets the moitor of this worker
        */
  @@ -163,6 +183,8 @@
                       log.error("Thread error", t);
                   }
               }
  +
  +            this.jobCount++;
   
               // release the worker
               monitor.release(this);
  
  
  
  1.1.2.2   +24 -8     jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/WorkerMonitor.java
  
  Index: WorkerMonitor.java
  ===================================================================
  RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/WorkerMonitor.java,v
  retrieving revision 1.1.2.1
  retrieving revision 1.1.2.2
  diff -u -r1.1.2.1 -r1.1.2.2
  --- WorkerMonitor.java	29 Nov 2003 23:00:01 -0000	1.1.2.1
  +++ WorkerMonitor.java	30 Nov 2003 15:30:33 -0000	1.1.2.2
  @@ -88,6 +88,9 @@
       /** Minimum amount of spare workers */
       private int spareWorkers = 5;
   
  +    /** Maximum of job processed by a worker before being released */
  +    private int maxJobsPerWorker = 10;
  +
       /** Stack containing currently idle workers */
       private Stack workers = new Stack();
   
  @@ -97,12 +100,8 @@
       /** Job queue */
       private Queue queue;
   
  -    /** QueueMonitor to process pending jobs queue */
  -    private QueueMonitor qMonitor;
  -
       public void init()
       {
  -        qMonitor = new QueueMonitor(queue,this);
       }
   
       /**
  @@ -122,7 +121,7 @@
                   wCount = maxWorkers - wCurrent;
               }
   
  -            log.info("Creating "+ wCount +" more workers -> "+ (wCurrent + wCount));
  +            log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
   
               for (int i = 0; i < wCount; ++i)
               {
  @@ -188,11 +187,28 @@
       }
   
       /**
  -     * Put back the worker in the idle queue
  +     * Put back the worker in the idle queue unless there are pending jobs and
  +     * worker can still be committed to a new job before being released.
        */
       public void release(Worker worker)
       {
  -        worker.setJob(null);
  +        // if worker can still proces some jobs assign the first
  +        // backlog job to this worker, else reset job count and put
  +        // it on the idle queue.
  +
  +        synchronized (worker)
  +        {
  +            if (worker.getJobCount()<this.maxJobsPerWorker)
  +            {
  +                worker.setJob((RenderingJob)queue.pop());
  +                return;
  +            }
  +            else
  +            {
  +                worker.setJob(null);
  +                worker.resetJobCount();
  +            }
  +        }
   
           synchronized (this.workers)
           {
  
  
  
  No                   revision
  
  Index: WorkerMonitor.java
  ===================================================================
  RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/WorkerMonitor.java,v
  retrieving revision 1.1.2.1
  retrieving revision 1.1.2.2
  diff -u -r1.1.2.1 -r1.1.2.2
  --- WorkerMonitor.java	29 Nov 2003 23:00:01 -0000	1.1.2.1
  +++ WorkerMonitor.java	30 Nov 2003 15:30:33 -0000	1.1.2.2
  @@ -88,6 +88,9 @@
       /** Minimum amount of spare workers */
       private int spareWorkers = 5;
   
  +    /** Maximum of job processed by a worker before being released */
  +    private int maxJobsPerWorker = 10;
  +
       /** Stack containing currently idle workers */
       private Stack workers = new Stack();
   
  @@ -97,12 +100,8 @@
       /** Job queue */
       private Queue queue;
   
  -    /** QueueMonitor to process pending jobs queue */
  -    private QueueMonitor qMonitor;
  -
       public void init()
       {
  -        qMonitor = new QueueMonitor(queue,this);
       }
   
       /**
  @@ -122,7 +121,7 @@
                   wCount = maxWorkers - wCurrent;
               }
   
  -            log.info("Creating "+ wCount +" more workers -> "+ (wCurrent + wCount));
  +            log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
   
               for (int i = 0; i < wCount; ++i)
               {
  @@ -188,11 +187,28 @@
       }
   
       /**
  -     * Put back the worker in the idle queue
  +     * Put back the worker in the idle queue unless there are pending jobs and
  +     * worker can still be committed to a new job before being released.
        */
       public void release(Worker worker)
       {
  -        worker.setJob(null);
  +        // if worker can still proces some jobs assign the first
  +        // backlog job to this worker, else reset job count and put
  +        // it on the idle queue.
  +
  +        synchronized (worker)
  +        {
  +            if (worker.getJobCount()<this.maxJobsPerWorker)
  +            {
  +                worker.setJob((RenderingJob)queue.pop());
  +                return;
  +            }
  +            else
  +            {
  +                worker.setJob(null);
  +                worker.resetJobCount();
  +            }
  +        }
   
           synchronized (this.workers)
           {
  
  
  
  No                   revision
  
  Index: WorkerMonitor.java
  ===================================================================
  RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/WorkerMonitor.java,v
  retrieving revision 1.1.2.1
  retrieving revision 1.1.2.2
  diff -u -r1.1.2.1 -r1.1.2.2
  --- WorkerMonitor.java	29 Nov 2003 23:00:01 -0000	1.1.2.1
  +++ WorkerMonitor.java	30 Nov 2003 15:30:33 -0000	1.1.2.2
  @@ -88,6 +88,9 @@
       /** Minimum amount of spare workers */
       private int spareWorkers = 5;
   
  +    /** Maximum of job processed by a worker before being released */
  +    private int maxJobsPerWorker = 10;
  +
       /** Stack containing currently idle workers */
       private Stack workers = new Stack();
   
  @@ -97,12 +100,8 @@
       /** Job queue */
       private Queue queue;
   
  -    /** QueueMonitor to process pending jobs queue */
  -    private QueueMonitor qMonitor;
  -
       public void init()
       {
  -        qMonitor = new QueueMonitor(queue,this);
       }
   
       /**
  @@ -122,7 +121,7 @@
                   wCount = maxWorkers - wCurrent;
               }
   
  -            log.info("Creating "+ wCount +" more workers -> "+ (wCurrent + wCount));
  +            log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
   
               for (int i = 0; i < wCount; ++i)
               {
  @@ -188,11 +187,28 @@
       }
   
       /**
  -     * Put back the worker in the idle queue
  +     * Put back the worker in the idle queue unless there are pending jobs and
  +     * worker can still be committed to a new job before being released.
        */
       public void release(Worker worker)
       {
  -        worker.setJob(null);
  +        // if worker can still proces some jobs assign the first
  +        // backlog job to this worker, else reset job count and put
  +        // it on the idle queue.
  +
  +        synchronized (worker)
  +        {
  +            if (worker.getJobCount()<this.maxJobsPerWorker)
  +            {
  +                worker.setJob((RenderingJob)queue.pop());
  +                return;
  +            }
  +            else
  +            {
  +                worker.setJob(null);
  +                worker.resetJobCount();
  +            }
  +        }
   
           synchronized (this.workers)
           {
  
  
  
  1.1.2.1   +107 -0    jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/HttpBufferedResponse.java
  
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: jetspeed-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jetspeed-dev-help@jakarta.apache.org