You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@tomcat.apache.org by Leonardo Fraga <le...@cedrofinances.com.br> on 2007/12/27 21:50:17 UTC
Re: Re^2: Comet servlet synchronization and flush problems
Hello,
Your posts helped a lot. Thanks everybody!
Now I have a production level asynchronous servlet (a Santa Claus'
gift?), and I want to share some technical learned lessons:
- You must ensure the response be commited before asynchronous writes,
say on the BEGIN event, as said Filip and Frank.
- You must ensure there will not be asynchronous writes on the response
after respective ERROR and END events were processed.
- You must treat every IOException on asynchronous writes as
unrecoverable (so, stop writing on that response). Each of these IOE
will always be followed by an ERROR event.
- You must ensure only one thread will access the response, say for
write, flush, at each time.
- You must ensure your response collection stays ok along time
(collection = BEGIN - ERROR - END). Synchronized and finally are good
friends for that.
- You must avoid a big global lock strategy (like that pointed on the
ChatCometServletExample). Such a lock drastically reduces your throughput.
- It's easier to wrap the responses and build the response collection as
thread-safe capsules. Then you can solve all these constraints by design.
Thanks again
Hugs,
Leonardo Fraga
Web Developer
Cedro Finances
Filip Hanik - Dev Lists wrote:
> ffd wrote:
>
>> I found the origin of the problem in my case. The problem was neither
>> the
>> synchronization nor the connection gone. It was that the asynchronous
>> writing started too early, before the connection was completely
>> established,
>> speaking losely...
>
> just call response.flushBuffer() on the BEGIN event, should get
> everything squared away
>
> Filip
>
>> When I made sure that the writing starts *after* all the headers were
>> set,
>> the NPE was gone. I didn't see the same mistake in Leonardo's code, but
>> maybe it's a hint.
>>
>> BTW, I very much like the Tomcat approach to Comet. In its striking yet
>> powerful simplicity it's much simpler and clearer to handle than
>> Grizzly (we
>> tried Grizzly before choosing to go with Tomcat) or Jetty. I hope
>> that any
>> standardization efforts will go along Tomcat's lines.
>>
>> Frank Felix
>> 1000MIKES.COM
>>
>>
>>
>> Filip Hanik - Dev Lists wrote:
>>
>>
>>> a NPE probably has nothing to do with synchronization, more like you
>>> are trying to write back on an invalid connection (that probably
>>> timed out)
>>>
>>> Filip
>>>
>>> Leonardo Fraga wrote:
>>>
>>>
>>>> Hello,
>>>>
>>>> I'm developing a java web application for finances quotes'
>>>> real-time stream, for hundreds of concurrent users. The solution
>>>> involves a long http request serving javascript snippets that will
>>>> be executed by broser when received, changing the data shown to the
>>>> user.
>>>>
>>>> To do this, I have a Comet servlet, ServletStream, that receives
>>>> the request, and registers a new instance of
>>>> JavascriptNotificationListener, a wrapper to the response.writer,
>>>> to a running worker thread, MarketNotifier, that, when a market
>>>> event occurs, fires the JavascriptNotificationListener that writes
>>>> the corresponding event javascript snippet to the response.writer.
>>>> There is the Flusher, a running thread that awakes every half
>>>> second and do a flush on every registered response.writer through
>>>> the same JavascriptNotificationListener. And there is a servlet,
>>>> StreamAction, that submits commands to the MarketNotifier, to
>>>> change the market event subscriptions. Some of these commands can
>>>> also fire market events on the JavascriptNotificationListener, and
>>>> are run while the ServletStream is sending data.
>>>>
>>>> Every IO on the writer (print, println, flush, checkError) is
>>>> synchronized on a mutex Object holded by each instance of
>>>> JavascriptNotificationListener, that is, I have a mutex for each
>>>> response.writer.
>>>>
>>>> When running this solution, I often use to get the following error:
>>>> java.lang.NullPointerException
>>>> at
>>>> org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(InternalNioOutputBuffer.java:607)
>>>>
>>>> at
>>>> org.apache.coyote.http11.InternalNioOutputBuffer.commit(InternalNioOutputBuffer.java:600)
>>>>
>>>> at
>>>> org.apache.coyote.http11.Http11NioProcessor.action(Http11NioProcessor.java:1025)
>>>>
>>>> at org.apache.coyote.Response.action(Response.java:183)
>>>> at org.apache.coyote.Response.sendHeaders(Response.java:379)
>>>> at
>>>> org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:305)
>>>>
>>>> at
>>>> org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
>>>>
>>>> at
>>>> org.apache.catalina.connector.CoyoteWriter.flush(CoyoteWriter.java:95)
>>>> at
>>>> br.com.cedro.stream.JavascriptNotificationListener.syn(JavascriptNotificationListener.java:106)
>>>>
>>>> at br.com.cedro.stream.Flusher.run(Flusher.java:42)
>>>>
>>>> This error occurs even on low usage scenarios (five concurrent
>>>> users) almost every hour. After the first occurrence happens, every
>>>> other IO on all response.writers catches the same exception.
>>>>
>>>> I'm using apache-tomcat-6.0.14, with default Nio Http Connector
>>>> configuration parameters on server.xml.
>>>>
>>>> I think this problem has to do with synchronization for IO
>>>> operations. Am I right? Or must I simply check some status before
>>>> doing IO?
>>>>
>>>> The Chat Comet tomcat example does synchronization on a single
>>>> mutex for all response.writers. Must I do the same? Why? Or can I
>>>> use higher-granularity mutexes like I did? If so, what is the right
>>>> object to synchronize? Response? Response.writer?
>>>>
>>>> Has anyone developed Comet servlets and faced the same problem?
>>>>
>>>> Here follow the relevant pieces of my source codes (I put some
>>>> comments about the problem to increase readability):
>>>>
>>>> public class JavascriptNotificationListener ... {
>>>> // This is the only class that manipulates response.writer
>>>> ...
>>>> private PrintWriter out; // set as response.writer on constructor
>>>> private Object writeMonitor = new Object(); // the mutex
>>>>
>>>> public JavascriptNotificationListener(PrintWriter responseWriter){
>>>> this.out = responseWriter;
>>>> synchronized (writeMonitor) { // Althought at this time we
>>>> have just one thread here
>>>> out.print("<html><head><script language=\"javascript\">");
>>>> }
>>>> }
>>>>
>>>> ...
>>>> public void flush(){
>>>> // Called by the Flusher thread every 500 msec
>>>> synchronized (writeMonitor) {
>>>> out.print("</script>\n<script language=\"javascript\">");
>>>> // checkError internally calls flush
>>>> if(out.checkError()){
>>>> log.error("Error flushing event stream.");
>>>> }
>>>> }
>>>> }
>>>> ...
>>>> public void syn(){
>>>> // Called by the Flusher thread every five seconds
>>>> synchronized (writeMonitor) {
>>>> out.print("s();");
>>>> if(out.checkError()){
>>>> log.error("Error flushing event stream.");
>>>> }
>>>> }
>>>> ...
>>>> public void fire(MarketEvent e){
>>>> // Called by the worker thread
>>>> synchronized (writeMonitor) {
>>>> out.print(translate(e));
>>>> }
>>>> }
>>>> ...
>>>> }
>>>>
>>>> public class ServletStream ... {
>>>> // The comet stream servlet
>>>> ...
>>>> private static final Flusher flusher = new Flusher(500); // Will
>>>> be started only once on Servlet.init
>>>>
>>>> public void event(CometEvent event) throws IOException,
>>>> ServletException {
>>>> ...
>>>> final HttpServletResponse response =
>>>> event.getHttpServletResponse();
>>>> final HttpServletResponse response =
>>>> event.getHttpServletResponse();
>>>>
>>>> // A user can only have one session on our system
>>>> String username = (String)
>>>> request.getSession().getAttribute("username");
>>>>
>>>> if(event.getEventType() == CometEvent.EventType.BEGIN){
>>>> if(!AuthStore.checkKey((String)
>>>> request.getSession().getAttribute("key"))){
>>>> // User unauthenticated
>>>> response.getWriter().println("...Permission
>>>> Denied..."); // just to simplify: I write an HTML instead
>>>> event.close();
>>>> } else {
>>>> response.setCharacterEncoding("UTF-8");
>>>> // This notify object is active and has a working
>>>> thread
>>>> MarketNotifier notify =
>>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>> // We wrap our writer inside an object that
>>>> understands javascript serialization
>>>> JavascriptNotificationListener jsNot = new
>>>> JavascriptNotificationListener(response.getWriter());
>>>> // Now we subscribe to receive the fire(MarketEvent)
>>>> notify.addNotificationListener(jsNot);
>>>> // And subscribe to be called on flush() and syn()
>>>> flusher.addFlushable(username, jsNot);
>>>> // LoadSubscribedEvents is a command to define the
>>>> initial listeners for specific market events
>>>> // Note that MarketNotifier.process(Command) can
>>>> call JavascriptNotificationListener.fire(MarketEvent),
>>>> // that is, here we can have another thread acessing
>>>> response.writer
>>>> // This method is also called by an action servlet
>>>> (see the following class)
>>>> notify.process(new LoadSubscribedEvents());
>>>> }
>>>> } else if (event.getEventType() ==
>>>> CometEvent.EventType.ERROR || event.getEventType() ==
>>>> CometEvent.EventType.END) {
>>>> // Stops receiving syn() and flush()
>>>> flusher.removeFlushable(username);
>>>> MarketNotifier notify =
>>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>> notify.process(new UnLoadSubscribedEvents());
>>>> NotifierManager.freeMarketNotifierForUser(username + ":"
>>>> + sessionId);
>>>> event.close();
>>>> }
>>>> }
>>>> }
>>>>
>>>> public class Flusher extends Thread {
>>>> // Our Flusher thread
>>>> private int msec; // time interval to flush, set on constructor
>>>> ...
>>>>
>>>> public void run(){
>>>> int multiplier = INTERVAL_TO_SYN_IN_SECS * 1000 / msec;
>>>> int i = 0;
>>>> while(running.get()){
>>>> try {
>>>> startFlush = new Date();
>>>> synchronized (flushMonitor) { // This mutex is also
>>>> used on addFlushable and removeFlushable
>>>> ++i;
>>>> for(Flushable f : flushables){
>>>> if(i % multiplier == 0){
>>>> f.syn();
>>>> } else {
>>>> f.flush();
>>>> }
>>>> }
>>>> }
>>>> try {
>>>> long timeToSleep = startFlush.getTime() + msec -
>>>> (new Date()).getTime();
>>>> if(timeToSleep > 0){
>>>> Thread.sleep(timeToSleep);
>>>> } else {
>>>> log.info("...Flush contention..."); // More
>>>> than that, indeed :)
>>>> }
>>>> } catch (InterruptedException e) {
>>>> // Ignore!
>>>> }
>>>> } catch (Throwable t) {
>>>> // Here we catch that exception!
>>>> log.error("Fail to flush.", t);
>>>> }
>>>> }
>>>> }
>>>> }
>>>>
>>>> public class StreamAction ... {
>>>> // Our subscription command processor servlet, called while the
>>>> stream is served to change its content
>>>> ...
>>>>
>>>> public void service(HttpServletRequest request,
>>>> HttpServletResponse response) throws ServletException, IOException {
>>>> String username = (String)
>>>> request.getSession().getAttribute("username");
>>>> MarketNotifier notify =
>>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>> ... // Discovers the command to be sent from
>>>> request.parameters
>>>> if(commandName.equals("SubscribeQuoteEvents")){ //
>>>> Simplified to easy understanding
>>>> // This can call
>>>> JavascriptNotificationListener.fire(MarketEvent)
>>>> notify.process(new
>>>> SubscribeQuoteEvents(commandParameters));
>>>> }
>>>> ...
>>>> }
>>>> }
>>>>
>>>> Any ideas?
>>>>
>>>> Thanks
>>>>
>>>> Leonardo Fraga
>>>> Web Developer
>>>> Cedro Finances
>>>>
>>>> ------------------------------------------------------------------------
>>>>
>>>>
>>>> No virus found in this incoming message.
>>>> Checked by AVG Free Edition. Version: 7.5.503 / Virus Database:
>>>> 269.16.2/1142 - Release Date:
>>>> 11/20/2007 5:44 PM
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To start a new topic, e-mail: users@tomcat.apache.org
>>> To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
>>> For additional commands, e-mail: users-help@tomcat.apache.org
>>>
>>>
>>>
>>>
>>
>>
>>
>
>
>
> ---------------------------------------------------------------------
> To start a new topic, e-mail: users@tomcat.apache.org
> To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: users-help@tomcat.apache.org
>
>
>
---------------------------------------------------------------------
To start a new topic, e-mail: users@tomcat.apache.org
To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
For additional commands, e-mail: users-help@tomcat.apache.org