You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@tomcat.apache.org by Leonardo Fraga <le...@cedrofinances.com.br> on 2007/12/27 21:50:17 UTC

Re: Re^2: Comet servlet synchronization and flush problems

Hello,

Your posts helped a lot. Thanks everybody!
Now I have a production level asynchronous servlet (a Santa Claus' 
gift?), and I want to share some technical learned lessons:
- You must ensure the response be commited before asynchronous writes, 
say on the BEGIN event, as said Filip and Frank.
- You must ensure there will not be asynchronous writes on the response 
after respective ERROR and END events were processed.
- You must treat every IOException on asynchronous writes as 
unrecoverable (so, stop writing on that response). Each of these IOE 
will always be followed by an ERROR event.
- You must ensure only one thread will access the response, say for 
write, flush, at each time.
- You must ensure your response collection stays ok along time 
(collection = BEGIN - ERROR - END). Synchronized and finally are good 
friends for that.
- You must avoid a big global lock strategy (like that pointed on the 
ChatCometServletExample). Such a lock drastically reduces your throughput.
- It's easier to wrap the responses and build the response collection as 
thread-safe capsules. Then you can solve all these constraints by design.

Thanks again

Hugs,

Leonardo Fraga
Web Developer
Cedro Finances


Filip Hanik - Dev Lists wrote:

> ffd wrote:
>
>> I found the origin of the problem in my case. The problem was neither 
>> the
>> synchronization nor the  connection gone. It was that the asynchronous
>> writing started too early, before the connection was completely 
>> established,
>> speaking losely...   
>
> just call response.flushBuffer() on the BEGIN event, should get 
> everything squared away
>
> Filip
>
>> When I made sure that the writing starts *after* all the headers were 
>> set,
>> the NPE was gone. I didn't see the same mistake in Leonardo's code, but
>> maybe it's a hint.
>>
>> BTW, I very much like the Tomcat approach to Comet. In its striking yet
>> powerful simplicity it's much simpler and clearer to handle than 
>> Grizzly (we
>> tried Grizzly before choosing to go with Tomcat) or Jetty. I hope 
>> that any
>> standardization efforts will go along Tomcat's lines.
>>
>> Frank Felix
>> 1000MIKES.COM
>>
>>
>>
>> Filip Hanik - Dev Lists wrote:
>>  
>>
>>> a NPE probably has nothing to do with synchronization, more like you 
>>> are trying to write back on an invalid connection (that probably 
>>> timed out)
>>>
>>> Filip
>>>
>>> Leonardo Fraga wrote:
>>>    
>>>
>>>> Hello,
>>>>
>>>> I'm developing a java web application for finances quotes' 
>>>> real-time stream, for hundreds of concurrent users. The solution 
>>>> involves a long http request serving javascript snippets that will 
>>>> be executed by broser when received, changing the data shown to the 
>>>> user.
>>>>
>>>> To do this, I have a Comet servlet, ServletStream, that receives 
>>>> the request, and registers a new instance of 
>>>> JavascriptNotificationListener, a wrapper to the response.writer, 
>>>> to a running worker thread, MarketNotifier, that, when a market 
>>>> event occurs, fires the JavascriptNotificationListener that writes 
>>>> the corresponding event javascript snippet to the response.writer. 
>>>> There is the Flusher, a running thread that awakes every half 
>>>> second and do a flush on every registered response.writer through 
>>>> the same JavascriptNotificationListener. And there is a servlet, 
>>>> StreamAction, that submits commands to the MarketNotifier, to 
>>>> change the market event subscriptions. Some of these commands can 
>>>> also fire market events on the JavascriptNotificationListener, and 
>>>> are run while the ServletStream is sending data.
>>>>
>>>> Every IO on the writer (print, println, flush, checkError) is 
>>>> synchronized on a mutex Object holded by each instance of 
>>>> JavascriptNotificationListener, that is, I have a mutex for each 
>>>> response.writer.
>>>>
>>>> When running this solution, I often use to get the following error:
>>>> java.lang.NullPointerException
>>>>    at 
>>>> org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(InternalNioOutputBuffer.java:607) 
>>>>
>>>>    at 
>>>> org.apache.coyote.http11.InternalNioOutputBuffer.commit(InternalNioOutputBuffer.java:600) 
>>>>
>>>>    at 
>>>> org.apache.coyote.http11.Http11NioProcessor.action(Http11NioProcessor.java:1025) 
>>>>
>>>>    at org.apache.coyote.Response.action(Response.java:183)
>>>>    at org.apache.coyote.Response.sendHeaders(Response.java:379)
>>>>    at 
>>>> org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:305) 
>>>>
>>>>    at 
>>>> org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288) 
>>>>
>>>>    at 
>>>> org.apache.catalina.connector.CoyoteWriter.flush(CoyoteWriter.java:95)
>>>>    at 
>>>> br.com.cedro.stream.JavascriptNotificationListener.syn(JavascriptNotificationListener.java:106) 
>>>>
>>>>    at br.com.cedro.stream.Flusher.run(Flusher.java:42)
>>>>
>>>> This error occurs even on low usage scenarios (five concurrent 
>>>> users) almost every hour. After the first occurrence happens, every 
>>>> other IO on all response.writers catches the same exception.
>>>>
>>>> I'm using apache-tomcat-6.0.14, with default Nio Http Connector 
>>>> configuration parameters on server.xml.
>>>>
>>>> I think this problem has to do with synchronization for IO 
>>>> operations. Am I right? Or must I simply check some status before 
>>>> doing IO?
>>>>
>>>> The Chat Comet tomcat example does synchronization on a single 
>>>> mutex for all response.writers. Must I do the same? Why? Or can I 
>>>> use higher-granularity mutexes like I did? If so, what is the right 
>>>> object to synchronize? Response? Response.writer?
>>>>
>>>> Has anyone developed Comet servlets and faced the same problem?
>>>>
>>>> Here follow the relevant pieces of my source codes (I put some 
>>>> comments about the problem to increase readability):
>>>>
>>>> public class JavascriptNotificationListener ... {
>>>>    // This is the only class that manipulates response.writer
>>>>    ...
>>>>    private PrintWriter out; // set as response.writer on constructor
>>>>    private Object writeMonitor = new Object(); // the mutex
>>>>
>>>>    public JavascriptNotificationListener(PrintWriter responseWriter){
>>>>        this.out = responseWriter;
>>>>        synchronized (writeMonitor) { // Althought at this time we 
>>>> have just one thread here
>>>>            out.print("<html><head><script language=\"javascript\">");
>>>>        }
>>>>    }
>>>>
>>>>    ...
>>>>    public void flush(){
>>>>        // Called by the Flusher thread every 500 msec
>>>>        synchronized (writeMonitor) {
>>>>            out.print("</script>\n<script language=\"javascript\">");
>>>>            // checkError internally calls flush
>>>>            if(out.checkError()){
>>>>                log.error("Error flushing event stream.");
>>>>            }
>>>>        }
>>>>    }
>>>>    ...
>>>>    public void syn(){
>>>>        // Called by the Flusher thread every five seconds
>>>>        synchronized (writeMonitor) {
>>>>            out.print("s();");
>>>>            if(out.checkError()){
>>>>                log.error("Error flushing event stream.");
>>>>        }
>>>>    }
>>>>    ...
>>>>    public void fire(MarketEvent e){
>>>>        // Called by the worker thread
>>>>        synchronized (writeMonitor) {
>>>>            out.print(translate(e));
>>>>        }
>>>>    }
>>>>    ...
>>>> }
>>>>
>>>> public class ServletStream ... {
>>>>    // The comet stream servlet
>>>>    ...
>>>>    private static final Flusher flusher = new Flusher(500); // Will 
>>>> be started only once on Servlet.init
>>>>
>>>>    public void event(CometEvent event) throws IOException, 
>>>> ServletException {
>>>>        ...
>>>>        final HttpServletResponse response = 
>>>> event.getHttpServletResponse();
>>>>        final HttpServletResponse response = 
>>>> event.getHttpServletResponse();
>>>>
>>>>        // A user can only have one session on our system
>>>>        String username = (String) 
>>>> request.getSession().getAttribute("username");
>>>>
>>>>        if(event.getEventType() == CometEvent.EventType.BEGIN){
>>>>            if(!AuthStore.checkKey((String) 
>>>> request.getSession().getAttribute("key"))){
>>>>                // User unauthenticated
>>>>                response.getWriter().println("...Permission 
>>>> Denied..."); // just to simplify: I write an HTML instead
>>>>                event.close();
>>>>            } else {
>>>>                response.setCharacterEncoding("UTF-8");
>>>>                // This notify object is active and has a working 
>>>> thread
>>>>                MarketNotifier notify = 
>>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>>                // We wrap our writer inside an object that 
>>>> understands javascript serialization
>>>>                JavascriptNotificationListener jsNot = new 
>>>> JavascriptNotificationListener(response.getWriter());
>>>>                // Now we subscribe to receive the fire(MarketEvent)
>>>>                notify.addNotificationListener(jsNot);
>>>>                // And subscribe to be called on flush() and syn()
>>>>                flusher.addFlushable(username, jsNot);
>>>>                // LoadSubscribedEvents is a command to define the 
>>>> initial listeners for specific market events
>>>>                // Note that MarketNotifier.process(Command) can 
>>>> call JavascriptNotificationListener.fire(MarketEvent),
>>>>                // that is, here we can have another thread acessing 
>>>> response.writer
>>>>                // This method is also called by an action servlet 
>>>> (see the following class)
>>>>                notify.process(new LoadSubscribedEvents());
>>>>            }
>>>>        } else if (event.getEventType() == 
>>>> CometEvent.EventType.ERROR || event.getEventType() == 
>>>> CometEvent.EventType.END) {
>>>>            // Stops receiving syn() and flush()
>>>>            flusher.removeFlushable(username);
>>>>            MarketNotifier notify = 
>>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>>            notify.process(new UnLoadSubscribedEvents());
>>>>            NotifierManager.freeMarketNotifierForUser(username + ":" 
>>>> + sessionId);
>>>>            event.close();
>>>>        }
>>>>    }
>>>> }
>>>>
>>>> public class Flusher extends Thread {
>>>>    // Our Flusher thread
>>>>    private int msec; // time interval to flush, set on constructor
>>>>    ...
>>>>
>>>>    public void run(){
>>>>        int multiplier = INTERVAL_TO_SYN_IN_SECS * 1000 / msec;
>>>>        int i = 0;
>>>>        while(running.get()){
>>>>            try {
>>>>                startFlush = new Date();
>>>>                synchronized (flushMonitor) { // This mutex is also 
>>>> used on addFlushable and removeFlushable
>>>>                    ++i;
>>>>                    for(Flushable f : flushables){
>>>>                        if(i % multiplier == 0){
>>>>                            f.syn();
>>>>                        } else {
>>>>                            f.flush();
>>>>                        }
>>>>                    }
>>>>                }
>>>>                try {
>>>>                    long timeToSleep = startFlush.getTime() + msec - 
>>>> (new Date()).getTime();
>>>>                    if(timeToSleep > 0){
>>>>                        Thread.sleep(timeToSleep);
>>>>                    } else {
>>>>                        log.info("...Flush contention..."); // More 
>>>> than that, indeed :)
>>>>                    }
>>>>                } catch (InterruptedException e) {
>>>>                    // Ignore!
>>>>                }
>>>>            } catch (Throwable t) {
>>>>                // Here we catch that exception!
>>>>                log.error("Fail to flush.", t);
>>>>            }
>>>>        }
>>>>    }
>>>> }
>>>>
>>>> public class StreamAction ... {
>>>>    // Our subscription command processor servlet, called while the 
>>>> stream is served to change its content
>>>>    ...
>>>>
>>>>    public void service(HttpServletRequest request, 
>>>> HttpServletResponse response) throws ServletException, IOException {
>>>>        String username = (String) 
>>>> request.getSession().getAttribute("username");
>>>>        MarketNotifier notify = 
>>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>>              ... // Discovers the command to be sent from 
>>>> request.parameters
>>>>        if(commandName.equals("SubscribeQuoteEvents")){ // 
>>>> Simplified to easy understanding
>>>>            // This can call 
>>>> JavascriptNotificationListener.fire(MarketEvent)
>>>>            notify.process(new 
>>>> SubscribeQuoteEvents(commandParameters));
>>>>        }
>>>>        ...
>>>>    }
>>>> }
>>>>
>>>> Any ideas?
>>>>
>>>> Thanks
>>>>
>>>> Leonardo Fraga
>>>> Web Developer
>>>> Cedro Finances
>>>>
>>>> ------------------------------------------------------------------------ 
>>>>
>>>>
>>>> No virus found in this incoming message.
>>>> Checked by AVG Free Edition. Version: 7.5.503 / Virus Database: 
>>>> 269.16.2/1142 - Release Date:
>>>> 11/20/2007 5:44 PM
>>>>         
>>>
>>> ---------------------------------------------------------------------
>>> To start a new topic, e-mail: users@tomcat.apache.org
>>> To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
>>> For additional commands, e-mail: users-help@tomcat.apache.org
>>>
>>>
>>>
>>>     
>>
>>
>>   
>
>
>
> ---------------------------------------------------------------------
> To start a new topic, e-mail: users@tomcat.apache.org
> To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: users-help@tomcat.apache.org
>
>
>


---------------------------------------------------------------------
To start a new topic, e-mail: users@tomcat.apache.org
To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
For additional commands, e-mail: users-help@tomcat.apache.org