You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jetspeed-dev@portals.apache.org by ra...@apache.org on 2003/11/30 16:30:33 UTC
cvs commit: jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl HttpBufferedResponse.java ContentDispatcherImpl.java PortletRendererImpl.java Worker.java WorkerMonitor.java QueueMonitor.java
raphael 2003/11/30 07:30:33
Modified: portal/src/java/org/apache/jetspeed/aggregator Tag:
aggregation_1-branch ContentDispatcher.java
portal/src/java/org/apache/jetspeed/aggregator/impl Tag:
aggregation_1-branch ContentDispatcherImpl.java
PortletRendererImpl.java Worker.java
WorkerMonitor.java
Added: portal/src/java/org/apache/jetspeed/aggregator/impl Tag:
aggregation_1-branch HttpBufferedResponse.java
Removed: portal/src/java/org/apache/jetspeed/aggregator/impl Tag:
aggregation_1-branch QueueMonitor.java
Log:
- Remove polling QueueMonitor in favor to job limits per worker
- Flesh out content dispatcher
- Make sure code compiles cleanly
Revision Changes Path
No revision
No revision
1.1.2.2 +13 -4 jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/Attic/ContentDispatcher.java
Index: ContentDispatcher.java
===================================================================
RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/Attic/ContentDispatcher.java,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -r1.1.2.1 -r1.1.2.2
--- ContentDispatcher.java 29 Nov 2003 23:00:01 -0000 1.1.2.1
+++ ContentDispatcher.java 30 Nov 2003 15:30:32 -0000 1.1.2.2
@@ -53,9 +53,14 @@
*/
package org.apache.jetspeed.aggregator;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
import org.apache.jetspeed.om.page.Fragment;
-import javax.portlet.PortletRequest;
-import javax.portlet.PortletResponse;
+import org.apache.jetspeed.request.RequestContext;
+import org.apache.pluto.om.common.ObjectID;
+import org.apache.pluto.om.window.PortletWindow;
/**
* <p>The ContentDispatcher allows customer classes to retrieved
@@ -72,5 +77,9 @@
* If the fragment rendered content is not yet available, the method will
* hold until it's completely rendered.
*/
- public void include(Fragment fragment, PortletRequest req, PortletResponse rsp);
+ public void include(Fragment fragment, HttpServletRequest req, HttpServletResponse rsp);
+
+ public void notify(ObjectID oid);
+
+ public HttpServletResponse register(PortletWindow window, RequestContext request);
}
No revision
No revision
1.1.2.2 +128 -4 jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/ContentDispatcherImpl.java
Index: ContentDispatcherImpl.java
===================================================================
RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/ContentDispatcherImpl.java,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -r1.1.2.1 -r1.1.2.2
--- ContentDispatcherImpl.java 29 Nov 2003 23:00:01 -0000 1.1.2.1
+++ ContentDispatcherImpl.java 30 Nov 2003 15:30:33 -0000 1.1.2.2
@@ -53,10 +53,22 @@
*/
package org.apache.jetspeed.aggregator.impl;
+import java.util.Map;
+import java.util.Hashtable;
+
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.jetspeed.aggregator.ContentDispatcher;
import org.apache.jetspeed.om.page.Fragment;
-import javax.portlet.PortletRequest;
-import javax.portlet.PortletResponse;
+import org.apache.jetspeed.request.RequestContext;
+import org.apache.jetspeed.util.JetspeedObjectID;
+import org.apache.jetspeed.util.ByteArrayServletOutputStream;
+import org.apache.pluto.om.common.ObjectID;
+import org.apache.pluto.om.window.PortletWindow;
/**
* <p>The ContentDispatcher allows customer classes to retrieved
@@ -67,13 +79,125 @@
*/
public class ContentDispatcherImpl implements ContentDispatcher
{
+ /** Commons logging */
+ protected final static Log log = LogFactory.getLog(ContentDispatcherImpl.class);
+
+ private Map contents = new Hashtable();
+
/**
* Include in the provided PortletResponse output stream the rendered content
* of the request fragment.
* If the fragment rendered content is not yet available, the method will
* hold until it's completely rendered.
*/
- public void include(Fragment fragment, PortletRequest req, PortletResponse rsp)
+ public void include(Fragment fragment, HttpServletRequest req, HttpServletResponse rsp)
+ {
+ ObjectID oid = JetspeedObjectID.createFromString(fragment.getId());
+
+ PortletContent content = (PortletContent)contents.get(oid);
+
+ if (content!=null)
+ {
+ synchronized (content)
+ {
+ if (!content.isComplete())
+ {
+ log.debug("Waiting for content of fragment "+oid);
+ try
+ {
+ content.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ log.debug("Been notified that content "+oid+" is complete");
+ }
+ }
+
+ try
+ {
+ try
+ {
+ rsp.getWriter().write(new String(content.toByteArray(),
+ rsp.getCharacterEncoding())
+ );
+
+ }
+ catch (IllegalStateException e)
+ {
+ rsp.getOutputStream().write(content.toByteArray());
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to include content "+oid+" in response object", e);
+ }
+ finally
+ {
+ synchronized(contents)
+ {
+ log.debug("Removing content "+oid);
+ contents.remove(oid);
+ }
+ }
+ }
+ }
+
+ public void notify(ObjectID oid)
+ {
+ PortletContent content = (PortletContent)contents.get(oid);
+
+ if (content!=null)
+ {
+ synchronized (content)
+ {
+ content.setComplete(true);
+ content.notifyAll();
+ }
+ }
+ }
+
+ public HttpServletResponse register(PortletWindow window, RequestContext request)
+ {
+ PortletContent myContent = new PortletContent();
+
+ synchronized (contents)
+ {
+ contents.put(window.getId(),myContent);
+ }
+
+ return new HttpBufferedResponse(request.getResponseForWindow(window),
+ myContent.getOutputStream());
+ }
+
+ protected class PortletContent
{
+ private ByteArrayServletOutputStream os;
+ private boolean complete = false;
+
+ PortletContent()
+ {
+ os = new ByteArrayServletOutputStream();
+ }
+
+ public ServletOutputStream getOutputStream()
+ {
+ return os;
+ }
+
+ public byte[] toByteArray()
+ {
+ return os.toByteArray();
+ }
+
+ public boolean isComplete()
+ {
+ return complete;
+ }
+
+ void setComplete(boolean state)
+ {
+ this.complete = state;
+ }
}
}
1.1.2.2 +4 -7 jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/PortletRendererImpl.java
Index: PortletRendererImpl.java
===================================================================
RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/PortletRendererImpl.java,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -r1.1.2.1 -r1.1.2.2
--- PortletRendererImpl.java 29 Nov 2003 23:00:01 -0000 1.1.2.1
+++ PortletRendererImpl.java 30 Nov 2003 15:30:33 -0000 1.1.2.2
@@ -157,15 +157,12 @@
ObjectID oid = JetspeedObjectID.createFromString(fragment.getId());
PortletEntity portletEntity = PortletEntityAccess.getEntity(oid);
PortletWindow portletWindow = PortletWindowFactory.getWindow(portletEntity, oid);
-
- HttpServletRequest servletRequest = request.getRequestForWindow(portletWindow);
- HttpServletResponse servletResponse = request.getResponseForWindow(portletWindow);
-
ContentDispatcher dispatcher = getDispatcher(request);
+ HttpServletResponse servletResponse = dispatcher.register(portletWindow, request);
+
rJob.setWindow(portletWindow);
- // We should actually wrap these objects with thread-safe versions
- rJob.setRequest(servletRequest);
+ rJob.setRequest(request.getRequestForWindow(portletWindow));
rJob.setResponse(servletResponse);
rJob.setDispatcher(dispatcher);
1.1.2.2 +24 -2 jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/Worker.java
Index: Worker.java
===================================================================
RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/Worker.java,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -r1.1.2.1 -r1.1.2.2
--- Worker.java 29 Nov 2003 23:00:01 -0000 1.1.2.1
+++ Worker.java 30 Nov 2003 15:30:33 -0000 1.1.2.2
@@ -73,6 +73,10 @@
/** Running status of this worker */
private boolean running = true;
+ /** Counter of consecutive jobs that can be processed before the
+ worker being actually put back on the idle queue */
+ private int jobCount = 0;
+
/** Job to process */
private Runnable job = null;
@@ -94,6 +98,23 @@
}
/**
+ * Return the number of jobs processed by this worker since the last time it
+ * has been on the idle queue
+ */
+ public int getJobCount()
+ {
+ return this.jobCount;
+ }
+
+ /**
+ * Reset the processed job counter
+ */
+ public void resetJobCount()
+ {
+ this.jobCount=0;
+ }
+
+ /**
* Sets the running status of this Worker. If set to false, the Worker will
* stop after processing its current job.
*/
@@ -102,7 +123,6 @@
this.running = status;
}
-
/**
* Sets the moitor of this worker
*/
@@ -163,6 +183,8 @@
log.error("Thread error", t);
}
}
+
+ this.jobCount++;
// release the worker
monitor.release(this);
1.1.2.2 +24 -8 jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/WorkerMonitor.java
Index: WorkerMonitor.java
===================================================================
RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/WorkerMonitor.java,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -r1.1.2.1 -r1.1.2.2
--- WorkerMonitor.java 29 Nov 2003 23:00:01 -0000 1.1.2.1
+++ WorkerMonitor.java 30 Nov 2003 15:30:33 -0000 1.1.2.2
@@ -88,6 +88,9 @@
/** Minimum amount of spare workers */
private int spareWorkers = 5;
+ /** Maximum of job processed by a worker before being released */
+ private int maxJobsPerWorker = 10;
+
/** Stack containing currently idle workers */
private Stack workers = new Stack();
@@ -97,12 +100,8 @@
/** Job queue */
private Queue queue;
- /** QueueMonitor to process pending jobs queue */
- private QueueMonitor qMonitor;
-
public void init()
{
- qMonitor = new QueueMonitor(queue,this);
}
/**
@@ -122,7 +121,7 @@
wCount = maxWorkers - wCurrent;
}
- log.info("Creating "+ wCount +" more workers -> "+ (wCurrent + wCount));
+ log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
for (int i = 0; i < wCount; ++i)
{
@@ -188,11 +187,28 @@
}
/**
- * Put back the worker in the idle queue
+ * Put back the worker in the idle queue unless there are pending jobs and
+ * worker can still be committed to a new job before being released.
*/
public void release(Worker worker)
{
- worker.setJob(null);
+ // if worker can still proces some jobs assign the first
+ // backlog job to this worker, else reset job count and put
+ // it on the idle queue.
+
+ synchronized (worker)
+ {
+ if (worker.getJobCount()<this.maxJobsPerWorker)
+ {
+ worker.setJob((RenderingJob)queue.pop());
+ return;
+ }
+ else
+ {
+ worker.setJob(null);
+ worker.resetJobCount();
+ }
+ }
synchronized (this.workers)
{
No revision
Index: WorkerMonitor.java
===================================================================
RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/WorkerMonitor.java,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -r1.1.2.1 -r1.1.2.2
--- WorkerMonitor.java 29 Nov 2003 23:00:01 -0000 1.1.2.1
+++ WorkerMonitor.java 30 Nov 2003 15:30:33 -0000 1.1.2.2
@@ -88,6 +88,9 @@
/** Minimum amount of spare workers */
private int spareWorkers = 5;
+ /** Maximum of job processed by a worker before being released */
+ private int maxJobsPerWorker = 10;
+
/** Stack containing currently idle workers */
private Stack workers = new Stack();
@@ -97,12 +100,8 @@
/** Job queue */
private Queue queue;
- /** QueueMonitor to process pending jobs queue */
- private QueueMonitor qMonitor;
-
public void init()
{
- qMonitor = new QueueMonitor(queue,this);
}
/**
@@ -122,7 +121,7 @@
wCount = maxWorkers - wCurrent;
}
- log.info("Creating "+ wCount +" more workers -> "+ (wCurrent + wCount));
+ log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
for (int i = 0; i < wCount; ++i)
{
@@ -188,11 +187,28 @@
}
/**
- * Put back the worker in the idle queue
+ * Put back the worker in the idle queue unless there are pending jobs and
+ * worker can still be committed to a new job before being released.
*/
public void release(Worker worker)
{
- worker.setJob(null);
+ // if worker can still proces some jobs assign the first
+ // backlog job to this worker, else reset job count and put
+ // it on the idle queue.
+
+ synchronized (worker)
+ {
+ if (worker.getJobCount()<this.maxJobsPerWorker)
+ {
+ worker.setJob((RenderingJob)queue.pop());
+ return;
+ }
+ else
+ {
+ worker.setJob(null);
+ worker.resetJobCount();
+ }
+ }
synchronized (this.workers)
{
No revision
Index: WorkerMonitor.java
===================================================================
RCS file: /home/cvs/jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/WorkerMonitor.java,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -r1.1.2.1 -r1.1.2.2
--- WorkerMonitor.java 29 Nov 2003 23:00:01 -0000 1.1.2.1
+++ WorkerMonitor.java 30 Nov 2003 15:30:33 -0000 1.1.2.2
@@ -88,6 +88,9 @@
/** Minimum amount of spare workers */
private int spareWorkers = 5;
+ /** Maximum of job processed by a worker before being released */
+ private int maxJobsPerWorker = 10;
+
/** Stack containing currently idle workers */
private Stack workers = new Stack();
@@ -97,12 +100,8 @@
/** Job queue */
private Queue queue;
- /** QueueMonitor to process pending jobs queue */
- private QueueMonitor qMonitor;
-
public void init()
{
- qMonitor = new QueueMonitor(queue,this);
}
/**
@@ -122,7 +121,7 @@
wCount = maxWorkers - wCurrent;
}
- log.info("Creating "+ wCount +" more workers -> "+ (wCurrent + wCount));
+ log.info("Creating "+ wCount +" workers -> "+ (wCurrent + wCount));
for (int i = 0; i < wCount; ++i)
{
@@ -188,11 +187,28 @@
}
/**
- * Put back the worker in the idle queue
+ * Put back the worker in the idle queue unless there are pending jobs and
+ * worker can still be committed to a new job before being released.
*/
public void release(Worker worker)
{
- worker.setJob(null);
+ // if worker can still proces some jobs assign the first
+ // backlog job to this worker, else reset job count and put
+ // it on the idle queue.
+
+ synchronized (worker)
+ {
+ if (worker.getJobCount()<this.maxJobsPerWorker)
+ {
+ worker.setJob((RenderingJob)queue.pop());
+ return;
+ }
+ else
+ {
+ worker.setJob(null);
+ worker.resetJobCount();
+ }
+ }
synchronized (this.workers)
{
1.1.2.1 +107 -0 jakarta-jetspeed-2/portal/src/java/org/apache/jetspeed/aggregator/impl/Attic/HttpBufferedResponse.java
---------------------------------------------------------------------
To unsubscribe, e-mail: jetspeed-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jetspeed-dev-help@jakarta.apache.org