You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@tomcat.apache.org by ffd <ff...@gmx.net> on 2007/11/26 21:19:52 UTC

Re^2: Comet servlet synchronization and flush problems


I found the origin of the problem in my case. The problem was neither the
synchronization nor the  connection gone. It was that the asynchronous
writing started too early, before the connection was completely established,
speaking losely... 

When I made sure that the writing starts *after* all the headers were set,
the NPE was gone. I didn't see the same mistake in Leonardo's code, but
maybe it's a hint.

BTW, I very much like the Tomcat approach to Comet. In its striking yet
powerful simplicity it's much simpler and clearer to handle than Grizzly (we
tried Grizzly before choosing to go with Tomcat) or Jetty. I hope that any
standardization efforts will go along Tomcat's lines.

Frank Felix
1000MIKES.COM



Filip Hanik - Dev Lists wrote:
> 
> a NPE probably has nothing to do with synchronization, more like you are 
> trying to write back on an invalid connection (that probably timed out)
> 
> Filip
> 
> Leonardo Fraga wrote:
>> Hello,
>>
>> I'm developing a java web application for finances quotes' real-time 
>> stream, for hundreds of concurrent users. The solution involves a long 
>> http request serving javascript snippets that will be executed by 
>> broser when received, changing the data shown to the user.
>>
>> To do this, I have a Comet servlet, ServletStream, that receives the 
>> request, and registers a new instance of 
>> JavascriptNotificationListener, a wrapper to the response.writer, to a 
>> running worker thread, MarketNotifier, that, when a market event 
>> occurs, fires the JavascriptNotificationListener that writes the 
>> corresponding event javascript snippet to the response.writer. There 
>> is the Flusher, a running thread that awakes every half second and do 
>> a flush on every registered response.writer through the same 
>> JavascriptNotificationListener. And there is a servlet, StreamAction, 
>> that submits commands to the MarketNotifier, to change the market 
>> event subscriptions. Some of these commands can also fire market 
>> events on the JavascriptNotificationListener, and are run while the 
>> ServletStream is sending data.
>>
>> Every IO on the writer (print, println, flush, checkError) is 
>> synchronized on a mutex Object holded by each instance of 
>> JavascriptNotificationListener, that is, I have a mutex for each 
>> response.writer.
>>
>> When running this solution, I often use to get the following error:
>> java.lang.NullPointerException
>>    at 
>> org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(InternalNioOutputBuffer.java:607) 
>>
>>    at 
>> org.apache.coyote.http11.InternalNioOutputBuffer.commit(InternalNioOutputBuffer.java:600) 
>>
>>    at 
>> org.apache.coyote.http11.Http11NioProcessor.action(Http11NioProcessor.java:1025) 
>>
>>    at org.apache.coyote.Response.action(Response.java:183)
>>    at org.apache.coyote.Response.sendHeaders(Response.java:379)
>>    at 
>> org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:305)
>>    at 
>> org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
>>    at 
>> org.apache.catalina.connector.CoyoteWriter.flush(CoyoteWriter.java:95)
>>    at 
>> br.com.cedro.stream.JavascriptNotificationListener.syn(JavascriptNotificationListener.java:106) 
>>
>>    at br.com.cedro.stream.Flusher.run(Flusher.java:42)
>>
>> This error occurs even on low usage scenarios (five concurrent users) 
>> almost every hour. After the first occurrence happens, every other IO 
>> on all response.writers catches the same exception.
>>
>> I'm using apache-tomcat-6.0.14, with default Nio Http Connector 
>> configuration parameters on server.xml.
>>
>> I think this problem has to do with synchronization for IO operations. 
>> Am I right? Or must I simply check some status before doing IO?
>>
>> The Chat Comet tomcat example does synchronization on a single mutex 
>> for all response.writers. Must I do the same? Why? Or can I use 
>> higher-granularity mutexes like I did? If so, what is the right object 
>> to synchronize? Response? Response.writer?
>>
>> Has anyone developed Comet servlets and faced the same problem?
>>
>> Here follow the relevant pieces of my source codes (I put some 
>> comments about the problem to increase readability):
>>
>> public class JavascriptNotificationListener ... {
>>    // This is the only class that manipulates response.writer
>>    ...
>>    private PrintWriter out; // set as response.writer on constructor
>>    private Object writeMonitor = new Object(); // the mutex
>>
>>    public JavascriptNotificationListener(PrintWriter responseWriter){
>>        this.out = responseWriter;
>>        synchronized (writeMonitor) { // Althought at this time we have 
>> just one thread here
>>            out.print("<html><head><script language=\"javascript\">");
>>        }
>>    }
>>
>>    ...
>>    public void flush(){
>>        // Called by the Flusher thread every 500 msec
>>        synchronized (writeMonitor) {
>>            out.print("</script>\n<script language=\"javascript\">");
>>            // checkError internally calls flush
>>            if(out.checkError()){
>>                log.error("Error flushing event stream.");
>>            }
>>        }
>>    }
>>    ...
>>    public void syn(){
>>        // Called by the Flusher thread every five seconds
>>        synchronized (writeMonitor) {
>>            out.print("s();");
>>            if(out.checkError()){
>>                log.error("Error flushing event stream.");
>>        }
>>    }
>>    ...
>>    public void fire(MarketEvent e){
>>        // Called by the worker thread
>>        synchronized (writeMonitor) {
>>            out.print(translate(e));
>>        }
>>    }
>>    ...
>> }
>>
>> public class ServletStream ... {
>>    // The comet stream servlet
>>    ...
>>    private static final Flusher flusher = new Flusher(500); // Will be 
>> started only once on Servlet.init
>>
>>    public void event(CometEvent event) throws IOException, 
>> ServletException {
>>        ...
>>        final HttpServletResponse response = 
>> event.getHttpServletResponse();
>>        final HttpServletResponse response = 
>> event.getHttpServletResponse();
>>
>>        // A user can only have one session on our system
>>        String username = (String) 
>> request.getSession().getAttribute("username");
>>
>>        if(event.getEventType() == CometEvent.EventType.BEGIN){
>>            if(!AuthStore.checkKey((String) 
>> request.getSession().getAttribute("key"))){
>>                // User unauthenticated
>>                response.getWriter().println("...Permission 
>> Denied..."); // just to simplify: I write an HTML instead
>>                event.close();
>>            } else {
>>                response.setCharacterEncoding("UTF-8");
>>                // This notify object is active and has a working thread
>>                MarketNotifier notify = 
>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>                // We wrap our writer inside an object that understands 
>> javascript serialization
>>                JavascriptNotificationListener jsNot = new 
>> JavascriptNotificationListener(response.getWriter());
>>                // Now we subscribe to receive the fire(MarketEvent)
>>                notify.addNotificationListener(jsNot);
>>                // And subscribe to be called on flush() and syn()
>>                flusher.addFlushable(username, jsNot);
>>                // LoadSubscribedEvents is a command to define the 
>> initial listeners for specific market events
>>                // Note that MarketNotifier.process(Command) can call 
>> JavascriptNotificationListener.fire(MarketEvent),
>>                // that is, here we can have another thread acessing 
>> response.writer
>>                // This method is also called by an action servlet (see 
>> the following class)
>>                notify.process(new LoadSubscribedEvents());
>>            }
>>        } else if (event.getEventType() == CometEvent.EventType.ERROR 
>> || event.getEventType() == CometEvent.EventType.END) {
>>            // Stops receiving syn() and flush()
>>            flusher.removeFlushable(username);
>>            MarketNotifier notify = 
>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>            notify.process(new UnLoadSubscribedEvents());
>>            NotifierManager.freeMarketNotifierForUser(username + ":" + 
>> sessionId);
>>            event.close();
>>        }
>>    }
>> }
>>
>> public class Flusher extends Thread {
>>    // Our Flusher thread
>>    private int msec; // time interval to flush, set on constructor
>>    ...
>>
>>    public void run(){
>>        int multiplier = INTERVAL_TO_SYN_IN_SECS * 1000 / msec;
>>        int i = 0;
>>        while(running.get()){
>>            try {
>>                startFlush = new Date();
>>                synchronized (flushMonitor) { // This mutex is also 
>> used on addFlushable and removeFlushable
>>                    ++i;
>>                    for(Flushable f : flushables){
>>                        if(i % multiplier == 0){
>>                            f.syn();
>>                        } else {
>>                            f.flush();
>>                        }
>>                    }
>>                }
>>                try {
>>                    long timeToSleep = startFlush.getTime() + msec - 
>> (new Date()).getTime();
>>                    if(timeToSleep > 0){
>>                        Thread.sleep(timeToSleep);
>>                    } else {
>>                        log.info("...Flush contention..."); // More 
>> than that, indeed :)
>>                    }
>>                } catch (InterruptedException e) {
>>                    // Ignore!
>>                }
>>            } catch (Throwable t) {
>>                // Here we catch that exception!
>>                log.error("Fail to flush.", t);
>>            }
>>        }
>>    }
>> }
>>
>> public class StreamAction ... {
>>    // Our subscription command processor servlet, called while the 
>> stream is served to change its content
>>    ...
>>
>>    public void service(HttpServletRequest request, HttpServletResponse 
>> response) throws ServletException, IOException {
>>        String username = (String) 
>> request.getSession().getAttribute("username");
>>        MarketNotifier notify = 
>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>              ... // Discovers the command to be sent from 
>> request.parameters
>>        if(commandName.equals("SubscribeQuoteEvents")){ // Simplified 
>> to easy understanding
>>            // This can call 
>> JavascriptNotificationListener.fire(MarketEvent)
>>            notify.process(new SubscribeQuoteEvents(commandParameters));
>>        }
>>        ...
>>    }
>> }
>>
>> Any ideas?
>>
>> Thanks
>>
>> Leonardo Fraga
>> Web Developer
>> Cedro Finances
>>
>> ------------------------------------------------------------------------
>>
>> No virus found in this incoming message.
>> Checked by AVG Free Edition. 
>> Version: 7.5.503 / Virus Database: 269.16.2/1142 - Release Date:
>> 11/20/2007 5:44 PM
>>   
> 
> 
> ---------------------------------------------------------------------
> To start a new topic, e-mail: users@tomcat.apache.org
> To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: users-help@tomcat.apache.org
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/Comet-servlet-synchronization-and-flush-problems-tf4850825.html#a13957675
Sent from the Tomcat - User mailing list archive at Nabble.com.


---------------------------------------------------------------------
To start a new topic, e-mail: users@tomcat.apache.org
To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
For additional commands, e-mail: users-help@tomcat.apache.org


Re: Re^2: Comet servlet synchronization and flush problems

Posted by Leonardo Fraga <le...@cedrofinances.com.br>.
Hello,

Your posts helped a lot. Thanks everybody!
Now I have a production level asynchronous servlet (a Santa Claus' 
gift?), and I want to share some technical learned lessons:
- You must ensure the response be commited before asynchronous writes, 
say on the BEGIN event, as said Filip and Frank.
- You must ensure there will not be asynchronous writes on the response 
after respective ERROR and END events were processed.
- You must treat every IOException on asynchronous writes as 
unrecoverable (so, stop writing on that response). Each of these IOE 
will always be followed by an ERROR event.
- You must ensure only one thread will access the response, say for 
write, flush, at each time.
- You must ensure your response collection stays ok along time 
(collection = BEGIN - ERROR - END). Synchronized and finally are good 
friends for that.
- You must avoid a big global lock strategy (like that pointed on the 
ChatCometServletExample). Such a lock drastically reduces your throughput.
- It's easier to wrap the responses and build the response collection as 
thread-safe capsules. Then you can solve all these constraints by design.

Thanks again

Hugs,

Leonardo Fraga
Web Developer
Cedro Finances


Filip Hanik - Dev Lists wrote:

> ffd wrote:
>
>> I found the origin of the problem in my case. The problem was neither 
>> the
>> synchronization nor the  connection gone. It was that the asynchronous
>> writing started too early, before the connection was completely 
>> established,
>> speaking losely...   
>
> just call response.flushBuffer() on the BEGIN event, should get 
> everything squared away
>
> Filip
>
>> When I made sure that the writing starts *after* all the headers were 
>> set,
>> the NPE was gone. I didn't see the same mistake in Leonardo's code, but
>> maybe it's a hint.
>>
>> BTW, I very much like the Tomcat approach to Comet. In its striking yet
>> powerful simplicity it's much simpler and clearer to handle than 
>> Grizzly (we
>> tried Grizzly before choosing to go with Tomcat) or Jetty. I hope 
>> that any
>> standardization efforts will go along Tomcat's lines.
>>
>> Frank Felix
>> 1000MIKES.COM
>>
>>
>>
>> Filip Hanik - Dev Lists wrote:
>>  
>>
>>> a NPE probably has nothing to do with synchronization, more like you 
>>> are trying to write back on an invalid connection (that probably 
>>> timed out)
>>>
>>> Filip
>>>
>>> Leonardo Fraga wrote:
>>>    
>>>
>>>> Hello,
>>>>
>>>> I'm developing a java web application for finances quotes' 
>>>> real-time stream, for hundreds of concurrent users. The solution 
>>>> involves a long http request serving javascript snippets that will 
>>>> be executed by broser when received, changing the data shown to the 
>>>> user.
>>>>
>>>> To do this, I have a Comet servlet, ServletStream, that receives 
>>>> the request, and registers a new instance of 
>>>> JavascriptNotificationListener, a wrapper to the response.writer, 
>>>> to a running worker thread, MarketNotifier, that, when a market 
>>>> event occurs, fires the JavascriptNotificationListener that writes 
>>>> the corresponding event javascript snippet to the response.writer. 
>>>> There is the Flusher, a running thread that awakes every half 
>>>> second and do a flush on every registered response.writer through 
>>>> the same JavascriptNotificationListener. And there is a servlet, 
>>>> StreamAction, that submits commands to the MarketNotifier, to 
>>>> change the market event subscriptions. Some of these commands can 
>>>> also fire market events on the JavascriptNotificationListener, and 
>>>> are run while the ServletStream is sending data.
>>>>
>>>> Every IO on the writer (print, println, flush, checkError) is 
>>>> synchronized on a mutex Object holded by each instance of 
>>>> JavascriptNotificationListener, that is, I have a mutex for each 
>>>> response.writer.
>>>>
>>>> When running this solution, I often use to get the following error:
>>>> java.lang.NullPointerException
>>>>    at 
>>>> org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(InternalNioOutputBuffer.java:607) 
>>>>
>>>>    at 
>>>> org.apache.coyote.http11.InternalNioOutputBuffer.commit(InternalNioOutputBuffer.java:600) 
>>>>
>>>>    at 
>>>> org.apache.coyote.http11.Http11NioProcessor.action(Http11NioProcessor.java:1025) 
>>>>
>>>>    at org.apache.coyote.Response.action(Response.java:183)
>>>>    at org.apache.coyote.Response.sendHeaders(Response.java:379)
>>>>    at 
>>>> org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:305) 
>>>>
>>>>    at 
>>>> org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288) 
>>>>
>>>>    at 
>>>> org.apache.catalina.connector.CoyoteWriter.flush(CoyoteWriter.java:95)
>>>>    at 
>>>> br.com.cedro.stream.JavascriptNotificationListener.syn(JavascriptNotificationListener.java:106) 
>>>>
>>>>    at br.com.cedro.stream.Flusher.run(Flusher.java:42)
>>>>
>>>> This error occurs even on low usage scenarios (five concurrent 
>>>> users) almost every hour. After the first occurrence happens, every 
>>>> other IO on all response.writers catches the same exception.
>>>>
>>>> I'm using apache-tomcat-6.0.14, with default Nio Http Connector 
>>>> configuration parameters on server.xml.
>>>>
>>>> I think this problem has to do with synchronization for IO 
>>>> operations. Am I right? Or must I simply check some status before 
>>>> doing IO?
>>>>
>>>> The Chat Comet tomcat example does synchronization on a single 
>>>> mutex for all response.writers. Must I do the same? Why? Or can I 
>>>> use higher-granularity mutexes like I did? If so, what is the right 
>>>> object to synchronize? Response? Response.writer?
>>>>
>>>> Has anyone developed Comet servlets and faced the same problem?
>>>>
>>>> Here follow the relevant pieces of my source codes (I put some 
>>>> comments about the problem to increase readability):
>>>>
>>>> public class JavascriptNotificationListener ... {
>>>>    // This is the only class that manipulates response.writer
>>>>    ...
>>>>    private PrintWriter out; // set as response.writer on constructor
>>>>    private Object writeMonitor = new Object(); // the mutex
>>>>
>>>>    public JavascriptNotificationListener(PrintWriter responseWriter){
>>>>        this.out = responseWriter;
>>>>        synchronized (writeMonitor) { // Althought at this time we 
>>>> have just one thread here
>>>>            out.print("<html><head><script language=\"javascript\">");
>>>>        }
>>>>    }
>>>>
>>>>    ...
>>>>    public void flush(){
>>>>        // Called by the Flusher thread every 500 msec
>>>>        synchronized (writeMonitor) {
>>>>            out.print("</script>\n<script language=\"javascript\">");
>>>>            // checkError internally calls flush
>>>>            if(out.checkError()){
>>>>                log.error("Error flushing event stream.");
>>>>            }
>>>>        }
>>>>    }
>>>>    ...
>>>>    public void syn(){
>>>>        // Called by the Flusher thread every five seconds
>>>>        synchronized (writeMonitor) {
>>>>            out.print("s();");
>>>>            if(out.checkError()){
>>>>                log.error("Error flushing event stream.");
>>>>        }
>>>>    }
>>>>    ...
>>>>    public void fire(MarketEvent e){
>>>>        // Called by the worker thread
>>>>        synchronized (writeMonitor) {
>>>>            out.print(translate(e));
>>>>        }
>>>>    }
>>>>    ...
>>>> }
>>>>
>>>> public class ServletStream ... {
>>>>    // The comet stream servlet
>>>>    ...
>>>>    private static final Flusher flusher = new Flusher(500); // Will 
>>>> be started only once on Servlet.init
>>>>
>>>>    public void event(CometEvent event) throws IOException, 
>>>> ServletException {
>>>>        ...
>>>>        final HttpServletResponse response = 
>>>> event.getHttpServletResponse();
>>>>        final HttpServletResponse response = 
>>>> event.getHttpServletResponse();
>>>>
>>>>        // A user can only have one session on our system
>>>>        String username = (String) 
>>>> request.getSession().getAttribute("username");
>>>>
>>>>        if(event.getEventType() == CometEvent.EventType.BEGIN){
>>>>            if(!AuthStore.checkKey((String) 
>>>> request.getSession().getAttribute("key"))){
>>>>                // User unauthenticated
>>>>                response.getWriter().println("...Permission 
>>>> Denied..."); // just to simplify: I write an HTML instead
>>>>                event.close();
>>>>            } else {
>>>>                response.setCharacterEncoding("UTF-8");
>>>>                // This notify object is active and has a working 
>>>> thread
>>>>                MarketNotifier notify = 
>>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>>                // We wrap our writer inside an object that 
>>>> understands javascript serialization
>>>>                JavascriptNotificationListener jsNot = new 
>>>> JavascriptNotificationListener(response.getWriter());
>>>>                // Now we subscribe to receive the fire(MarketEvent)
>>>>                notify.addNotificationListener(jsNot);
>>>>                // And subscribe to be called on flush() and syn()
>>>>                flusher.addFlushable(username, jsNot);
>>>>                // LoadSubscribedEvents is a command to define the 
>>>> initial listeners for specific market events
>>>>                // Note that MarketNotifier.process(Command) can 
>>>> call JavascriptNotificationListener.fire(MarketEvent),
>>>>                // that is, here we can have another thread acessing 
>>>> response.writer
>>>>                // This method is also called by an action servlet 
>>>> (see the following class)
>>>>                notify.process(new LoadSubscribedEvents());
>>>>            }
>>>>        } else if (event.getEventType() == 
>>>> CometEvent.EventType.ERROR || event.getEventType() == 
>>>> CometEvent.EventType.END) {
>>>>            // Stops receiving syn() and flush()
>>>>            flusher.removeFlushable(username);
>>>>            MarketNotifier notify = 
>>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>>            notify.process(new UnLoadSubscribedEvents());
>>>>            NotifierManager.freeMarketNotifierForUser(username + ":" 
>>>> + sessionId);
>>>>            event.close();
>>>>        }
>>>>    }
>>>> }
>>>>
>>>> public class Flusher extends Thread {
>>>>    // Our Flusher thread
>>>>    private int msec; // time interval to flush, set on constructor
>>>>    ...
>>>>
>>>>    public void run(){
>>>>        int multiplier = INTERVAL_TO_SYN_IN_SECS * 1000 / msec;
>>>>        int i = 0;
>>>>        while(running.get()){
>>>>            try {
>>>>                startFlush = new Date();
>>>>                synchronized (flushMonitor) { // This mutex is also 
>>>> used on addFlushable and removeFlushable
>>>>                    ++i;
>>>>                    for(Flushable f : flushables){
>>>>                        if(i % multiplier == 0){
>>>>                            f.syn();
>>>>                        } else {
>>>>                            f.flush();
>>>>                        }
>>>>                    }
>>>>                }
>>>>                try {
>>>>                    long timeToSleep = startFlush.getTime() + msec - 
>>>> (new Date()).getTime();
>>>>                    if(timeToSleep > 0){
>>>>                        Thread.sleep(timeToSleep);
>>>>                    } else {
>>>>                        log.info("...Flush contention..."); // More 
>>>> than that, indeed :)
>>>>                    }
>>>>                } catch (InterruptedException e) {
>>>>                    // Ignore!
>>>>                }
>>>>            } catch (Throwable t) {
>>>>                // Here we catch that exception!
>>>>                log.error("Fail to flush.", t);
>>>>            }
>>>>        }
>>>>    }
>>>> }
>>>>
>>>> public class StreamAction ... {
>>>>    // Our subscription command processor servlet, called while the 
>>>> stream is served to change its content
>>>>    ...
>>>>
>>>>    public void service(HttpServletRequest request, 
>>>> HttpServletResponse response) throws ServletException, IOException {
>>>>        String username = (String) 
>>>> request.getSession().getAttribute("username");
>>>>        MarketNotifier notify = 
>>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>>              ... // Discovers the command to be sent from 
>>>> request.parameters
>>>>        if(commandName.equals("SubscribeQuoteEvents")){ // 
>>>> Simplified to easy understanding
>>>>            // This can call 
>>>> JavascriptNotificationListener.fire(MarketEvent)
>>>>            notify.process(new 
>>>> SubscribeQuoteEvents(commandParameters));
>>>>        }
>>>>        ...
>>>>    }
>>>> }
>>>>
>>>> Any ideas?
>>>>
>>>> Thanks
>>>>
>>>> Leonardo Fraga
>>>> Web Developer
>>>> Cedro Finances
>>>>
>>>> ------------------------------------------------------------------------ 
>>>>
>>>>
>>>> No virus found in this incoming message.
>>>> Checked by AVG Free Edition. Version: 7.5.503 / Virus Database: 
>>>> 269.16.2/1142 - Release Date:
>>>> 11/20/2007 5:44 PM
>>>>         
>>>
>>> ---------------------------------------------------------------------
>>> To start a new topic, e-mail: users@tomcat.apache.org
>>> To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
>>> For additional commands, e-mail: users-help@tomcat.apache.org
>>>
>>>
>>>
>>>     
>>
>>
>>   
>
>
>
> ---------------------------------------------------------------------
> To start a new topic, e-mail: users@tomcat.apache.org
> To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: users-help@tomcat.apache.org
>
>
>


---------------------------------------------------------------------
To start a new topic, e-mail: users@tomcat.apache.org
To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
For additional commands, e-mail: users-help@tomcat.apache.org


Re: Re^2: Comet servlet synchronization and flush problems

Posted by Filip Hanik - Dev Lists <de...@hanik.com>.
ffd wrote:
> I found the origin of the problem in my case. The problem was neither the
> synchronization nor the  connection gone. It was that the asynchronous
> writing started too early, before the connection was completely established,
> speaking losely... 
>   
just call response.flushBuffer() on the BEGIN event, should get 
everything squared away

Filip
> When I made sure that the writing starts *after* all the headers were set,
> the NPE was gone. I didn't see the same mistake in Leonardo's code, but
> maybe it's a hint.
>
> BTW, I very much like the Tomcat approach to Comet. In its striking yet
> powerful simplicity it's much simpler and clearer to handle than Grizzly (we
> tried Grizzly before choosing to go with Tomcat) or Jetty. I hope that any
> standardization efforts will go along Tomcat's lines.
>
> Frank Felix
> 1000MIKES.COM
>
>
>
> Filip Hanik - Dev Lists wrote:
>   
>> a NPE probably has nothing to do with synchronization, more like you are 
>> trying to write back on an invalid connection (that probably timed out)
>>
>> Filip
>>
>> Leonardo Fraga wrote:
>>     
>>> Hello,
>>>
>>> I'm developing a java web application for finances quotes' real-time 
>>> stream, for hundreds of concurrent users. The solution involves a long 
>>> http request serving javascript snippets that will be executed by 
>>> broser when received, changing the data shown to the user.
>>>
>>> To do this, I have a Comet servlet, ServletStream, that receives the 
>>> request, and registers a new instance of 
>>> JavascriptNotificationListener, a wrapper to the response.writer, to a 
>>> running worker thread, MarketNotifier, that, when a market event 
>>> occurs, fires the JavascriptNotificationListener that writes the 
>>> corresponding event javascript snippet to the response.writer. There 
>>> is the Flusher, a running thread that awakes every half second and do 
>>> a flush on every registered response.writer through the same 
>>> JavascriptNotificationListener. And there is a servlet, StreamAction, 
>>> that submits commands to the MarketNotifier, to change the market 
>>> event subscriptions. Some of these commands can also fire market 
>>> events on the JavascriptNotificationListener, and are run while the 
>>> ServletStream is sending data.
>>>
>>> Every IO on the writer (print, println, flush, checkError) is 
>>> synchronized on a mutex Object holded by each instance of 
>>> JavascriptNotificationListener, that is, I have a mutex for each 
>>> response.writer.
>>>
>>> When running this solution, I often use to get the following error:
>>> java.lang.NullPointerException
>>>    at 
>>> org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(InternalNioOutputBuffer.java:607) 
>>>
>>>    at 
>>> org.apache.coyote.http11.InternalNioOutputBuffer.commit(InternalNioOutputBuffer.java:600) 
>>>
>>>    at 
>>> org.apache.coyote.http11.Http11NioProcessor.action(Http11NioProcessor.java:1025) 
>>>
>>>    at org.apache.coyote.Response.action(Response.java:183)
>>>    at org.apache.coyote.Response.sendHeaders(Response.java:379)
>>>    at 
>>> org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:305)
>>>    at 
>>> org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
>>>    at 
>>> org.apache.catalina.connector.CoyoteWriter.flush(CoyoteWriter.java:95)
>>>    at 
>>> br.com.cedro.stream.JavascriptNotificationListener.syn(JavascriptNotificationListener.java:106) 
>>>
>>>    at br.com.cedro.stream.Flusher.run(Flusher.java:42)
>>>
>>> This error occurs even on low usage scenarios (five concurrent users) 
>>> almost every hour. After the first occurrence happens, every other IO 
>>> on all response.writers catches the same exception.
>>>
>>> I'm using apache-tomcat-6.0.14, with default Nio Http Connector 
>>> configuration parameters on server.xml.
>>>
>>> I think this problem has to do with synchronization for IO operations. 
>>> Am I right? Or must I simply check some status before doing IO?
>>>
>>> The Chat Comet tomcat example does synchronization on a single mutex 
>>> for all response.writers. Must I do the same? Why? Or can I use 
>>> higher-granularity mutexes like I did? If so, what is the right object 
>>> to synchronize? Response? Response.writer?
>>>
>>> Has anyone developed Comet servlets and faced the same problem?
>>>
>>> Here follow the relevant pieces of my source codes (I put some 
>>> comments about the problem to increase readability):
>>>
>>> public class JavascriptNotificationListener ... {
>>>    // This is the only class that manipulates response.writer
>>>    ...
>>>    private PrintWriter out; // set as response.writer on constructor
>>>    private Object writeMonitor = new Object(); // the mutex
>>>
>>>    public JavascriptNotificationListener(PrintWriter responseWriter){
>>>        this.out = responseWriter;
>>>        synchronized (writeMonitor) { // Althought at this time we have 
>>> just one thread here
>>>            out.print("<html><head><script language=\"javascript\">");
>>>        }
>>>    }
>>>
>>>    ...
>>>    public void flush(){
>>>        // Called by the Flusher thread every 500 msec
>>>        synchronized (writeMonitor) {
>>>            out.print("</script>\n<script language=\"javascript\">");
>>>            // checkError internally calls flush
>>>            if(out.checkError()){
>>>                log.error("Error flushing event stream.");
>>>            }
>>>        }
>>>    }
>>>    ...
>>>    public void syn(){
>>>        // Called by the Flusher thread every five seconds
>>>        synchronized (writeMonitor) {
>>>            out.print("s();");
>>>            if(out.checkError()){
>>>                log.error("Error flushing event stream.");
>>>        }
>>>    }
>>>    ...
>>>    public void fire(MarketEvent e){
>>>        // Called by the worker thread
>>>        synchronized (writeMonitor) {
>>>            out.print(translate(e));
>>>        }
>>>    }
>>>    ...
>>> }
>>>
>>> public class ServletStream ... {
>>>    // The comet stream servlet
>>>    ...
>>>    private static final Flusher flusher = new Flusher(500); // Will be 
>>> started only once on Servlet.init
>>>
>>>    public void event(CometEvent event) throws IOException, 
>>> ServletException {
>>>        ...
>>>        final HttpServletResponse response = 
>>> event.getHttpServletResponse();
>>>        final HttpServletResponse response = 
>>> event.getHttpServletResponse();
>>>
>>>        // A user can only have one session on our system
>>>        String username = (String) 
>>> request.getSession().getAttribute("username");
>>>
>>>        if(event.getEventType() == CometEvent.EventType.BEGIN){
>>>            if(!AuthStore.checkKey((String) 
>>> request.getSession().getAttribute("key"))){
>>>                // User unauthenticated
>>>                response.getWriter().println("...Permission 
>>> Denied..."); // just to simplify: I write an HTML instead
>>>                event.close();
>>>            } else {
>>>                response.setCharacterEncoding("UTF-8");
>>>                // This notify object is active and has a working thread
>>>                MarketNotifier notify = 
>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>                // We wrap our writer inside an object that understands 
>>> javascript serialization
>>>                JavascriptNotificationListener jsNot = new 
>>> JavascriptNotificationListener(response.getWriter());
>>>                // Now we subscribe to receive the fire(MarketEvent)
>>>                notify.addNotificationListener(jsNot);
>>>                // And subscribe to be called on flush() and syn()
>>>                flusher.addFlushable(username, jsNot);
>>>                // LoadSubscribedEvents is a command to define the 
>>> initial listeners for specific market events
>>>                // Note that MarketNotifier.process(Command) can call 
>>> JavascriptNotificationListener.fire(MarketEvent),
>>>                // that is, here we can have another thread acessing 
>>> response.writer
>>>                // This method is also called by an action servlet (see 
>>> the following class)
>>>                notify.process(new LoadSubscribedEvents());
>>>            }
>>>        } else if (event.getEventType() == CometEvent.EventType.ERROR 
>>> || event.getEventType() == CometEvent.EventType.END) {
>>>            // Stops receiving syn() and flush()
>>>            flusher.removeFlushable(username);
>>>            MarketNotifier notify = 
>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>            notify.process(new UnLoadSubscribedEvents());
>>>            NotifierManager.freeMarketNotifierForUser(username + ":" + 
>>> sessionId);
>>>            event.close();
>>>        }
>>>    }
>>> }
>>>
>>> public class Flusher extends Thread {
>>>    // Our Flusher thread
>>>    private int msec; // time interval to flush, set on constructor
>>>    ...
>>>
>>>    public void run(){
>>>        int multiplier = INTERVAL_TO_SYN_IN_SECS * 1000 / msec;
>>>        int i = 0;
>>>        while(running.get()){
>>>            try {
>>>                startFlush = new Date();
>>>                synchronized (flushMonitor) { // This mutex is also 
>>> used on addFlushable and removeFlushable
>>>                    ++i;
>>>                    for(Flushable f : flushables){
>>>                        if(i % multiplier == 0){
>>>                            f.syn();
>>>                        } else {
>>>                            f.flush();
>>>                        }
>>>                    }
>>>                }
>>>                try {
>>>                    long timeToSleep = startFlush.getTime() + msec - 
>>> (new Date()).getTime();
>>>                    if(timeToSleep > 0){
>>>                        Thread.sleep(timeToSleep);
>>>                    } else {
>>>                        log.info("...Flush contention..."); // More 
>>> than that, indeed :)
>>>                    }
>>>                } catch (InterruptedException e) {
>>>                    // Ignore!
>>>                }
>>>            } catch (Throwable t) {
>>>                // Here we catch that exception!
>>>                log.error("Fail to flush.", t);
>>>            }
>>>        }
>>>    }
>>> }
>>>
>>> public class StreamAction ... {
>>>    // Our subscription command processor servlet, called while the 
>>> stream is served to change its content
>>>    ...
>>>
>>>    public void service(HttpServletRequest request, HttpServletResponse 
>>> response) throws ServletException, IOException {
>>>        String username = (String) 
>>> request.getSession().getAttribute("username");
>>>        MarketNotifier notify = 
>>> NotifierManager.getOrCreateMarketNotifierForUser(pool, username);
>>>              ... // Discovers the command to be sent from 
>>> request.parameters
>>>        if(commandName.equals("SubscribeQuoteEvents")){ // Simplified 
>>> to easy understanding
>>>            // This can call 
>>> JavascriptNotificationListener.fire(MarketEvent)
>>>            notify.process(new SubscribeQuoteEvents(commandParameters));
>>>        }
>>>        ...
>>>    }
>>> }
>>>
>>> Any ideas?
>>>
>>> Thanks
>>>
>>> Leonardo Fraga
>>> Web Developer
>>> Cedro Finances
>>>
>>> ------------------------------------------------------------------------
>>>
>>> No virus found in this incoming message.
>>> Checked by AVG Free Edition. 
>>> Version: 7.5.503 / Virus Database: 269.16.2/1142 - Release Date:
>>> 11/20/2007 5:44 PM
>>>   
>>>       
>> ---------------------------------------------------------------------
>> To start a new topic, e-mail: users@tomcat.apache.org
>> To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
>> For additional commands, e-mail: users-help@tomcat.apache.org
>>
>>
>>
>>     
>
>   


---------------------------------------------------------------------
To start a new topic, e-mail: users@tomcat.apache.org
To unsubscribe, e-mail: users-unsubscribe@tomcat.apache.org
For additional commands, e-mail: users-help@tomcat.apache.org