You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mina.apache.org by "Ilya Ivanov (JIRA)" <ji...@apache.org> on 2011/07/29 04:48:09 UTC

[jira] [Created] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

ProtocolEncoderOutputImpl isn't thread-safe
-------------------------------------------

                 Key: DIRMINA-845
                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
             Project: MINA
          Issue Type: Bug
          Components: Filter
    Affects Versions: 2.0.4
            Reporter: Ilya Ivanov


ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).

E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).

Here the fragment of flushing code:

while (!bufferQueue.isEmpty()) {
  Object encodedMessage = bufferQueue.poll();
                
  if (encodedMessage == null) {
    break;
  }

  // Flush only when the buffer has remaining.
  if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
    SocketAddress destination = writeRequest.getDestination();
    WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 

    nextFilter.filterWrite(session, encodedWriteRequest);
  }
} 

Suppose original packets sequence is A, B, ...
Concurrent run of flushing may proceed as following:

thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
...
thread-2: nextFilter.filterWrite(...); // writes B packet
thread-1: nextFilter.filterWrite(...); // writes A packet

so, resulting sequence will B, A

It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072943#comment-13072943 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

Actually, I already solved this problem in my particular case by sub-classing ProtocolCodecFilter and providing custom implementation of ProtocolEncoderOutput. 

And I was disappointed, there is no easy way to provide custom ProtocolEncoderOutput impl. I was forced to override filterWrite, and messageSent and re-implement some nested classes, but really I just need that getEncoderOut would be protected and work as ProtocolEncoderOutput factory.

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072949#comment-13072949 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

I'm pleasured we can understand each other at last!
Sure, I try to test when ready.
Thank you!

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072905#comment-13072905 ] 

Ilya Ivanov edited comment on DIRMINA-845 at 7/29/11 4:43 PM:
--------------------------------------------------------------

"In your use case, you are assuming that frames coming from session 1 *and* from session 2 are in sync"

No, frames come async. Order of frames itself doesn't matter but order of chunks of one frame do matter.
And I've ordered chunks when writing to ProtocolEncoderOutput, in bufferQueue chunks placed in right order but when flushing order is distorted.

      was (Author: ilya.a.ivanov):
    "In your use case, you are assuming that frames coming from session 1 *and* from session 2 are in sync"

No, frames come async. Order of frames itself doesn't matter but order of chunks of one frame do matter.
  
> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072905#comment-13072905 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

"In your use case, you are assuming that frames coming from session 1 *and* from session 2 are in sync"

No, frames come async. Order of frames itself doesn't matter but order of chunks of one frame do matter.

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072660#comment-13072660 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

Are you using an executor in your chain ?

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072871#comment-13072871 ] 

Ilya Ivanov edited comment on DIRMINA-845 at 7/29/11 4:19 PM:
--------------------------------------------------------------

Let suppose we have 3 session (3 rtmp connections) 1, 2 and 3. Session 1 receives video frame and re-sends it to session 3, session 2 receives another video frame and re-send it to session 3 too. All sessions have different io-processor threads.

Note video frame represented as sequence of several messages in MINA terms (chunks). Frame is sent further when it received completely. When sending it is splitted on chunks again.

So, two incoming frames processed by different threads will concurrently pass through session 3 filter chain chunk by chunk. Exactly chunking leads to issue above. Flushing may break chunks order.

Let frame F1 chunked as F1A, F1B and F2 as F2A, F2B. They might be written to ProtocolEncoderOutput in such order F1A, F2A, F2B, F1B for example. It's ok that chunks of different frames are mixed, but important that B chunk follows after A for each frame. When flush runs concurrently the above sequence might be written to next filter as  F1A, F2B, F2A, F1B - wrong: F2B precedes  F2A!


I'm not sure this is right using of MINA or not but such implementation I saw in red5...

http://code.google.com/p/red5/source/browse/java/server/trunk/src/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java  lines 50 and 70

I tried to eliminate entire connection lock and replace it on per-channel lock (RTMP itself allows this). Also I add chunking which didn't present in original code.

      was (Author: ilya.a.ivanov):
    Let suppose we have 3 session (3 rtmp connections) 1, 2 and 3. Session 1 receives video frame and re-sends it to session 3, session 2 receives another video frame and re-send it to session 3 too. All sessions have different io-processor threads.

Note video frame represented as sequence of several messages in MINA terms (chunks) and frame is sent further when it received completely. When sending it is splitted on chunks again.

So, two incoming frames processed by different threads will concurrently pass through session 3 filter chain chunk by chunk. Exactly chunking leads to issue above. Flushing may break chunks order.

I'm not sure this is right using of MINA or not but such implementation I saw in red5...

http://code.google.com/p/red5/source/browse/java/server/trunk/src/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java  lines 50 and 70

I tried to eliminate entire connection lock and replace it on per-channel lock (RTMP itself allows this). Also I add chunking which didn't present in original code.
  
> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072696#comment-13072696 ] 

Ilya Ivanov edited comment on DIRMINA-845 at 7/29/11 6:24 AM:
--------------------------------------------------------------

Actually, it isn't my chain. I've found this problem when trying to optimize red5 media-server. 
AFAIU, in red5 "serial" chain is used, i.e. all filters are called in writer thread (there is no other executors).

      was (Author: ilya.a.ivanov):
    Actually, it isn't my chain. I've found this problem when trying to optimize red5 media-server. 
AFAIU, in red5 "linear" chain is used, i.e. all filters are called in writer thread (there is no other executors).
  
> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072871#comment-13072871 ] 

Ilya Ivanov edited comment on DIRMINA-845 at 7/29/11 4:08 PM:
--------------------------------------------------------------

Let suppose we have 3 session (3 rtmp connections) 1, 2 and 3. Session 1 receives video frame and re-sends it to session 3, session 2 receives another video frame and re-send it to session 3 too. All sessions have different io-processor threads.

Note video frame represented as sequence of several messages in MINA terms (chunks) and frame is sent further when it received completely. When sending it is splitted on chunks again.

So, two incoming frames processed by different threads will concurrently pass through session 3 filter chain chunk by chunk. Exactly chunking leads to issue above. Flushing may break chunks order.

I'm not sure this is right using of MINA or not but such implementation I saw in red5...

http://code.google.com/p/red5/source/browse/java/server/trunk/src/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java  lines 50 and 70

I tried to eliminate entire connection lock and replace it on per-channel lock (RTMP itself allows this). Also I add chunking which didn't present in original code.

      was (Author: ilya.a.ivanov):
    Let suppose we have 3 session (3 rtmp connections) 1, 2 and 3. Session 1 receives video frame and re-sends it to session 3, session 2 receives another video frame and re-send it to session 3 too. All sessions have different io-processor threads.

So, two incoming messages processed by different threads will pass through session 3 filter chain concurrently and leads to above issue on flushing buffer queue in ProtocolCodecFilter.filterWrite.

I'm not sure this is right using of MINA or not but such implementation I saw in red5...

http://code.google.com/p/red5/source/browse/java/server/trunk/src/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java  lines 50 and 70

I tried to eliminate entire connection lock and replace it on per-channel lock (RTMP itself allows this).
  
> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Dominic Williams (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13169893#comment-13169893 ] 

Dominic Williams commented on DIRMINA-845:
------------------------------------------

Ilya perhaps we can coordinate to create a solution for Red5.

Take a look at related issue regarding how serialization here can lead to deadlock http://code.google.com/p/red5/issues/detail?id=164
                
> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072787#comment-13072787 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

Ok,

when you send a message, you do it using the thread that was used to process the incoming request. This thread has been selected when the session n which this message has arrived was activated. 

If you haven't changed anything (like, adding an executor in the chain), then an incoming message for a specific session will *always* use the same thread, so there is no reason a message B can be written before message A, as the thread isn't available before message A is injected into the queue.

That's the theory, and trust me, it works. Now, if I don't have the code you are playing with, I won't be able to explain why you see some concurrent issues.

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072913#comment-13072913 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

About the frame order :
- I think I misunderstood the issue. What you are saying is that the frames coming from a session1 in the order (A,B) are pushed in this order to session 3, but are sometime transmitted in the reverse order (B,A), right ?

How do you write the frames to the session 3 ?

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13073136#comment-13073136 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

Looking at the current coe, there is an (undocumented) way to switch the ProtocolDecoderOutput class : you can set the "encoderOut" session property associated with your implementation of the ProtocolEncoderOutput interface. This has to be done when the SessionCreated event is processed in your handler.

It should work...

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Paul Gregoire (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13073226#comment-13073226 ] 

Paul Gregoire commented on DIRMINA-845:
---------------------------------------

Ilya, I hope to see a patch for Red5 when you get this sorted out :)

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13073581#comment-13073581 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

We are using red5 r4047 with MINA 2.0.0 RC1 and I couldn't just set "encoderOut" property because explicit type cast to ProtocolEncoderOutputImpl present in ProtocolCodecFilter.filterWrite

In MINA 2.0.4 situation is a bit better - type cast to AbstractProtocolEncoderOutput. Of course, I can extend AbstractProtocolEncoderOutput for my implementation but anyway it looks like workaround not a regular way.

I suppose ProtocolCodecFilter.filterWrite should look like this

        // ...
        try {
            // Now we can try to encode the response
            encoder.encode(session, message, encoderOut);
            
            // Send it directly
            if (encoderOut instanceof Flushable) {
                ((Flushable)encoderOut).flush();
            }            
            
            // Call the next filter
            nextFilter.filterWrite(session, new MessageWriteRequest(
                    writeRequest));
        } catch (Throwable t) {
            // ...
        } 

and ProtocolCodecFilter.getEncoderOut

     protected ProtocolEncoderOutput getEncoderOut(IoSession session,
        NextFilter nextFilter, WriteRequest writeRequest) {
        // default implementation which might be overridden in derived classes
        ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
        
        if (out == null) {
            // Create a new instance, and stores it into the session
            out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
            session.setAttribute(ENCODER_OUT, out);
        }
        
        return out;
    }


And several words about ConcurrentLinkedQueue in ProtocolCodecFilter again.

Emmanuel, early you wrote "We use a CLQ because you *may* have many messages written for i-one single session by many threads, if you add an executor in the chain."

I looked to Executors too. In case of OrderedThreadPoolExecutor all will work fine because it maintain events order per session, but if we insert UnorderedThreadPoolExecutor before ProtocolCodecFilter that might get us in trouble as described in this issue. Of course, you may say "if you set unordered executor then events order doesn't matter for you and it is ok that flushing distort order too". But there is interesting aspect here. Messages which are processed by executor (before-encode) and messages are written to nextFilter via ProtocolEncoderOutput (after-encode) might have different meaning. Back to my example, before-encode message is a video frame and order for entire frames doesn't matter, after-encode message is a video frame chunks and order of chunks do matter!

So, I just want to say, when I write to ProtocolEncoderOutput I expect that next filter receives messages in the same order as they were written (may be mixed with other messages, but relative order should be preserved) but flush may distort this relative order!


I understand this example may look quite artificial and I'm not sure it should be fixed in default implementation but such behavior must be reflected in doc and must be a regular way to provide custom implementation of ProtocolEncoderOutput for such cases.

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072922#comment-13072922 ] 

Ilya Ivanov edited comment on DIRMINA-845 at 7/29/11 5:14 PM:
--------------------------------------------------------------

I understand, there are reasons to leave class partially thread-safe, but I think this should be reflected in java doc at least.

"What you are saying is that the frames coming from a session1 in the order (A,B) are pushed in this order to session 3, but are sometime transmitted in the reverse order (B,A), right?"

Not exactly but closer to true. A and B aren't two frames they are chunks of one frame. Chunk equals to message in MINA terms.
One thread splits frame onto chunks and writes in to output (chunks order is correct here). Another concurrent thread does the same with another frame but writes to the same output. All correct until both thread run flush on the same output.

public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws ProtocolCodecException { 
  // ...
  final IoBuffer buf = encoder.encode(state, message); 
  if (buf != null) { 
    Chunker chunker = new Chunker(buf, state.getWriteChunkSize(), 2048); 
    while (chunker.hasNext()) { 
      out.write(chunker.next()); 
    }
  }
  // ...
  out.flush(); // <- problem HERE
}

Here 'session' is session 3 and encode runs concurrently in session 1's thread and session 2's thread. 

      was (Author: ilya.a.ivanov):
    I understand, there are reasons to leave class partially thread-safe, but I think this should be reflected in java doc at least.

"What you are saying is that the frames coming from a session1 in the order (A,B) are pushed in this order to session 3, but are sometime transmitted in the reverse order (B,A), right?"

Not exactly but closer to true. A and B aren't two frames they are chunks of one frame. Chunk equals to message in MINA terms.
One thread splits frame onto chunks and writes in to output (chunks order is correct here). Another concurrent thread does the same with another frame but writes to the same output. All correct until both thread run flush on the same output.

public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws ProtocolCodecException { 
  // ...
  final IoBuffer buf = encoder.encode(state, message); 
  if (buf != null) { 
    Chunker chunker = new Chunker(buf, state.getWriteChunkSize(), 2048); 
    while (chunker.hasNext()) { 
      out.write(chunker.next()); 
    }
  }
  // ...
  out.flush(); // <- problem HERE
}
  
> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Luigi Bitonti (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13267479#comment-13267479 ] 

Luigi Bitonti commented on DIRMINA-845:
---------------------------------------

Hi Ilya,

Interesting report. would you be able to share some details of your solution?

Thanks,
Luigi 
                
> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072936#comment-13072936 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

I forgot to mention that, yes, the Javadoco sucks :/

Also note I'm not arguing with you about the issue, I really try to understand your use case, to see if MINA should be modified in a way or another, or if we just need to document better the code.

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072903#comment-13072903 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

In general, it isn't so important what exactly causes concurrent run of flush method, I just don't understand why ConcurrentLinkedQueue is used in ProtocolEncoderOutputImpl if it isn't made this class thread-safe?! 

One more example, concurrent run of write and mergeAll may lead to BufferOverflowException. 

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13175285#comment-13175285 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

Hi, Dominic!

I’ve read your post about deadlock issue. Really, we didn’t face the deadlock problem because we use asynchronous message dispatching.
I tried to eliminate locks in encode/decode by another reason but looks like my solution also might solve your problem too.

I sent your e-mail with some details.
                
> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072696#comment-13072696 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

Actually, it isn't my chain. I've found this problem when trying to optimize red5 media-server. 
AFAIU, in red5 "linear" chain is used, i.e. all filters are called in writer thread (there is no other executors).

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072911#comment-13072911 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

We use a CLQ because you *may* have many messages written for i-one single session by many threads, if you add an executor in the chain. This queue is a guaranty that we don't lose messages.

Making this class thread safe would create a bottleneck which will be killing any heavily loaded server, when most of the time you don't have an executor. It's enough to make sure that each message has to be processed by the same session all the time. Or you can also use the OrderedThreadPoolExecutor which guarantees that message are processed in the right order when you add an executor.

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072944#comment-13072944 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

this problem should be mitigated by modifying the codec filter, and making the queue thread safe.

The best solution I foresee would be to copy the current code, add a mutex around the queue to protect it, and use this filter in your chain. 

I won't be a MINA filter though, unless we add it as a ThreadSafeProtocolCodecFilter.

Currently, I see no other solution...

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072947#comment-13072947 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

Ilya, this is a pretty damn good suggestion.

Being able to override the queue instead of sub-classing the whole filter would have saved you a lot of time.

We can work out such a modification in the MINA code base, making it easier to deal with thread-safeness. I'll try to patch the code, if you can test it (it will be in a branch) that would be great !

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072871#comment-13072871 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

Let suppose we have 3 session (3 rtmp connections) 1, 2 and 3. Session 1 receives video frame and re-sends it to session 3, session 2 receives another video frame and re-send it to session 3 too. All sessions have different io-processor threads.

So, two incoming messages processed by different threads will pass through session 3 filter chain concurrently and leads to above issue on flushing buffer queue in ProtocolCodecFilter.filterWrite.

I'm not sure this is right using of MINA or not but such implementation I saw in red5...

http://code.google.com/p/red5/source/browse/java/server/trunk/src/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java  lines 50 and 70

I tried to eliminate entire connection lock and replace it on per-channel lock (RTMP itself allows this).

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072938#comment-13072938 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

"The only reason you might send those message in the wrong order is that they are read from the queue by two different threads"

Exactly this case! flush itself read sequentially but two concurrent flush(es) are run at the moment because two threads writing to session 3.

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Ilya Ivanov (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072922#comment-13072922 ] 

Ilya Ivanov commented on DIRMINA-845:
-------------------------------------

I understand, there are reasons to leave class partially thread-safe, but I think this should be reflected in java doc at least.

"What you are saying is that the frames coming from a session1 in the order (A,B) are pushed in this order to session 3, but are sometime transmitted in the reverse order (B,A), right?"

Not exactly but closer to true. A and B aren't two frames they are chunks of one frame. Chunk equals to message in MINA terms.
One thread splits frame onto chunks and writes in to output (chunks order is correct here). Another concurrent thread does the same with another frame but writes to the same output. All correct until both thread run flush on the same output.

public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws ProtocolCodecException { 
  // ...
  final IoBuffer buf = encoder.encode(state, message); 
  if (buf != null) { 
    Chunker chunker = new Chunker(buf, state.getWriteChunkSize(), 2048); 
    while (chunker.hasNext()) { 
      out.write(chunker.next()); 
    }
  }
  // ...
  out.flush(); // <- problem HERE
}

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072898#comment-13072898 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

In your use case, you are assuming that frames coming from session 1 *and* from session 2 are in sync (ie the order they arrive must be respected).

This is not a MINA issue, it's up to your handler to deal with such a scenario. You have to order the frames before writing them to session 3. Ie, you should wait for frame 2A when receiving frame 2B before you can send 2B.

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (DIRMINA-845) ProtocolEncoderOutputImpl isn't thread-safe

Posted by "Emmanuel Lecharny (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/DIRMINA-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13072932#comment-13072932 ] 

Emmanuel Lecharny commented on DIRMINA-845:
-------------------------------------------

Still puzzled...

If the chunks are written in the CLQ in the right order, there is no reason for those chunks to be send in another order, as the queue is read sequentially in the flush() method. 

The only reason you might send those message in the wrong order is that they are read from the queue by two different threads, which should not be the case...

Am I missing something ?

> ProtocolEncoderOutputImpl isn't thread-safe
> -------------------------------------------
>
>                 Key: DIRMINA-845
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-845
>             Project: MINA
>          Issue Type: Bug
>          Components: Filter
>    Affects Versions: 2.0.4
>            Reporter: Ilya Ivanov
>
> ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).
> E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
> I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).
> Here the fragment of flushing code:
> while (!bufferQueue.isEmpty()) {
>   Object encodedMessage = bufferQueue.poll();
>                 
>   if (encodedMessage == null) {
>     break;
>   }
>   // Flush only when the buffer has remaining.
>   if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
>     SocketAddress destination = writeRequest.getDestination();
>     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
>     nextFilter.filterWrite(session, encodedWriteRequest);
>   }
> } 
> Suppose original packets sequence is A, B, ...
> Concurrent run of flushing may proceed as following:
> thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
> thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
> ...
> thread-2: nextFilter.filterWrite(...); // writes B packet
> thread-1: nextFilter.filterWrite(...); // writes A packet
> so, resulting sequence will B, A
> It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira