You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2019/04/03 11:31:42 UTC
svn commit: r1856870 -
/mina/site/trunk/content/mina-project/technical-documentation/message-flow.mdtext
Author: elecharny
Date: Wed Apr 3 11:31:42 2019
New Revision: 1856870
URL: http://svn.apache.org/viewvc?rev=1856870&view=rev
Log:
Formatting
Modified:
mina/site/trunk/content/mina-project/technical-documentation/message-flow.mdtext
Modified: mina/site/trunk/content/mina-project/technical-documentation/message-flow.mdtext
URL: http://svn.apache.org/viewvc/mina/site/trunk/content/mina-project/technical-documentation/message-flow.mdtext?rev=1856870&r1=1856869&r2=1856870&view=diff
==============================================================================
--- mina/site/trunk/content/mina-project/technical-documentation/message-flow.mdtext (original)
+++ mina/site/trunk/content/mina-project/technical-documentation/message-flow.mdtext Wed Apr 3 11:31:42 2019
@@ -22,154 +22,154 @@ When a selector receives some _OP_READ_
Here is the representation of such a flow, where the _SslFilter_ has been included (we will come back to it later)
- ::: text
- +-------------------+
- | IoAcceptor |
- | +---------+ |
- | | Select | |
- | +----+----+ |
- | | |
- +---------+---------+
- . flush/
- +---------+ . +---------+ wakeup
- | Channel <-----read------+ . +-----write-----> Channel | +----+
- +---------+ | | | +---------+ | |
- +---+--v--+---+ +----v----+---+
- | | writeRequestQueue | |
- | IoProcessor | +-+-+-+-+-+-+-+-+ | IoProcessor |
- | <- pop -+ | | | | | | | <- put- | |
- +---+-----^---+ +-+-+-+-+-+-+-+-+ +---------^---+
- | . . |
- fireMessageReceived | . . | write
- | . . |
- +---v---------+ +---v-----+---+
- | HeadFilter | | HeadFilter <---------------filterWrite---------------+
- +---+-----^---+ +-------------+ |
- | . . +-----+------+
- MessageReceived | . . +----------flushScheduledEvents--------> SslHandler |
- | . . | +-----+------+
- +---v---------+ +---v------+--+ |
- | SslFilter | | SslFilter +----put----+ |
- +---+-----^---+ +---------^---+ | |
- | . . | | filterWriteQueue |
- MessageReceived | . . | filterWrite | +-+-+-+-+-+-+-+-+ |
- | . . | +--> | | | | | | | <----pop---+
- +---v---------+ +---v---------+ +-+-+-+-+-+-+-+-+
- | <filters> | | <filters> |
- +---+-----v---+ +-------------+
- | . . |
- MessageReceived | . . | filterWrite
- | . . |
- +---v---------+ +---v-----+---+
- | TailFilter | | TailFilter |
- +---+-----^---+ +---------^---+
- | . . |
- MessageReceived | . . | fireFilterWrite
- | . . |
- +---v---------+ . |
- | IoHandler | . |
- +---+-----^---+ . |
- | . . |
- | ......................................... |
- | |
- +-------------------sessionWrite--------------------+
+ :::text
+ +-------------------+
+ | IoAcceptor |
+ | +---------+ |
+ | | Select | |
+ | +----+----+ |
+ | | |
+ +---------+---------+
+ . flush/
+ +---------+ . +---------+ wakeup
+ | Channel <-----read------+ . +-----write-----> Channel | +----+
+ +---------+ | | | +---------+ | |
+ +---+--v--+---+ +----v----+---+
+ | | writeRequestQueue | |
+ | IoProcessor | +-+-+-+-+-+-+-+-+ | IoProcessor |
+ | <- pop -+ | | | | | | | <- put- | |
+ +---+-----^---+ +-+-+-+-+-+-+-+-+ +---------^---+
+ | . . |
+ fireMessageReceived | . . | write
+ | . . |
+ +---v---------+ +---v-----+---+
+ | HeadFilter | | HeadFilter <---------------filterWrite---------------+
+ +---+-----^---+ +-------------+ |
+ | . . +-----+------+
+ MessageReceived | . . +----------flushScheduledEvents--------> SslHandler |
+ | . . | +-----+------+
+ +---v---------+ +---v------+--+ |
+ | SslFilter | | SslFilter +----put----+ |
+ +---+-----^---+ +---------^---+ | |
+ | . . | | filterWriteQueue |
+ MessageReceived | . . | filterWrite | +-+-+-+-+-+-+-+-+ |
+ | . . | +--> | | | | | | | <----pop---+
+ +---v---------+ +---v---------+ +-+-+-+-+-+-+-+-+
+ | <filters> | | <filters> |
+ +---+-----v---+ +-------------+
+ | . . |
+ MessageReceived | . . | filterWrite
+ | . . |
+ +---v---------+ +---v-----+---+
+ | TailFilter | | TailFilter |
+ +---+-----^---+ +---------^---+
+ | . . |
+ MessageReceived | . . | fireFilterWrite
+ | . . |
+ +---v---------+ . |
+ | IoHandler | . |
+ +---+-----^---+ . |
+ | . . |
+ | ......................................... |
+ | |
+ +-------------------sessionWrite--------------------+
For instance, if we consider an application which filter chain contains the _MdcInjectionFilter_, _SslFilter_, _CompressionFilter_, _ProtocolCodecFilter_, _LoggingFilter_, we end uo with a stack of 77 calls, which needs to be emptied before the response can eventually be written. Here is the stack (in reverse order, the latest call on top of it):
- ::: text
- NioProcessor(AbstractPollingIoProcessor<S>).flush(S) line: 449
- NioProcessor(AbstractPollingIoProcessor<S>).write(S, WriteRequest) line: 436
- NioProcessor(AbstractPollingIoProcessor<S>).write(IoSession, WriteRequest) line: 68
- SimpleIoProcessorPool<S>.write(S, WriteRequest) line: 278
- SimpleIoProcessorPool<S>.write(IoSession, WriteRequest) line: 80
- DefaultIoFilterChain$HeadFilter.filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 914
- DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
- DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
- DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
- IoFilterEvent.fire() line: 131
- MdcInjectionFilter.filter(IoFilterEvent) line: 162
- MdcInjectionFilter(CommonEventFilter).filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 68
- DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
- DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
- DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
- SslHandler.flushScheduledEvents() line: 330
- SslFilter.filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 671
- DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
- DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
- DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
- CompressionFilter.filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 152
- DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
- DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
- DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
- ProtocolCodecFilter.filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 340
- DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
- DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
- DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
- LoggingFilter(IoFilterAdapter).filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 138
- DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
- DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
- DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
- DefaultIoFilterChain$TailFilter(IoFilterAdapter).filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 138
- DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
- DefaultIoFilterChain.fireFilterWrite(WriteRequest) line: 746
- NioSocketSession(AbstractIoSession).write(Object, SocketAddress) line: 570
- NioSocketSession(AbstractIoSession).write(Object) line: 515
- ChatProtocolHandler.messageReceived(IoSession, Object) line: 106
- DefaultIoFilterChain$TailFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 1015
- DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
- DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
- DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
- LoggingFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 208
- DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
- DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
- DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
- ProtocolCodecFilter$ProtocolDecoderOutputImpl.flush(IoFilter$NextFilter, IoSession) line: 413
- ProtocolCodecFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 257
- DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
- DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
- DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
- CompressionFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 169
- DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
- DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
- DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
- SslHandler.flushScheduledEvents() line: 335
- SslFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 553
- DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
- DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
- DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
- IoFilterEvent.fire() line: 105
- MdcInjectionFilter.filter(IoFilterEvent) line: 162
- MdcInjectionFilter(CommonEventFilter).messageReceived(IoFilter$NextFilter, IoSession, Object) line: 84
- DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
- DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
- DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
- DefaultIoFilterChain$HeadFilter(IoFilterAdapter).messageReceived(IoFilter$NextFilter, IoSession, Object) line: 122
- DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
- DefaultIoFilterChain.fireMessageReceived(Object) line: 643
- NioProcessor(AbstractPollingIoProcessor<S>).read(S) line: 539
- AbstractPollingIoProcessor<S>.access$1200(AbstractPollingIoProcessor, AbstractIoSession) line: 68
- AbstractPollingIoProcessor$Processor.process(S) line: 1223
- AbstractPollingIoProcessor$Processor.process() line: 1212
- AbstractPollingIoProcessor$Processor.run() line: 683
- NamePreservingRunnable.run() line: 64
- ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149
- ThreadPoolExecutor$Worker.run() line: 624
- Thread.run() line: 748
+ :::text
+ NioProcessor(AbstractPollingIoProcessor<S>).flush(S) line: 449
+ NioProcessor(AbstractPollingIoProcessor<S>).write(S, WriteRequest) line: 436
+ NioProcessor(AbstractPollingIoProcessor<S>).write(IoSession, WriteRequest) line: 68
+ SimpleIoProcessorPool<S>.write(S, WriteRequest) line: 278
+ SimpleIoProcessorPool<S>.write(IoSession, WriteRequest) line: 80
+ DefaultIoFilterChain$HeadFilter.filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 914
+ DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
+ DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
+ DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
+ IoFilterEvent.fire() line: 131
+ MdcInjectionFilter.filter(IoFilterEvent) line: 162
+ MdcInjectionFilter(CommonEventFilter).filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 68
+ DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
+ DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
+ DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
+ SslHandler.flushScheduledEvents() line: 330
+ SslFilter.filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 671
+ DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
+ DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
+ DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
+ CompressionFilter.filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 152
+ DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
+ DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
+ DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
+ ProtocolCodecFilter.filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 340
+ DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
+ DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
+ DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
+ LoggingFilter(IoFilterAdapter).filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 138
+ DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
+ DefaultIoFilterChain.access$1500(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, WriteRequest) line: 49
+ DefaultIoFilterChain$EntryImpl$1.filterWrite(IoSession, WriteRequest) line: 1146
+ DefaultIoFilterChain$TailFilter(IoFilterAdapter).filterWrite(IoFilter$NextFilter, IoSession, WriteRequest) line: 138
+ DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest) line: 753
+ DefaultIoFilterChain.fireFilterWrite(WriteRequest) line: 746
+ NioSocketSession(AbstractIoSession).write(Object, SocketAddress) line: 570
+ NioSocketSession(AbstractIoSession).write(Object) line: 515
+ ChatProtocolHandler.messageReceived(IoSession, Object) line: 106
+ DefaultIoFilterChain$TailFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 1015
+ DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
+ DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
+ DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
+ LoggingFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 208
+ DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
+ DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
+ DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
+ ProtocolCodecFilter$ProtocolDecoderOutputImpl.flush(IoFilter$NextFilter, IoSession) line: 413
+ ProtocolCodecFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 257
+ DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
+ DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
+ DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
+ CompressionFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 169
+ DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
+ DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
+ DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
+ SslHandler.flushScheduledEvents() line: 335
+ SslFilter.messageReceived(IoFilter$NextFilter, IoSession, Object) line: 553
+ DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
+ DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
+ DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
+ IoFilterEvent.fire() line: 105
+ MdcInjectionFilter.filter(IoFilterEvent) line: 162
+ MdcInjectionFilter(CommonEventFilter).messageReceived(IoFilter$NextFilter, IoSession, Object) line: 84
+ DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
+ DefaultIoFilterChain.access$1300(DefaultIoFilterChain, IoFilterChain$Entry, IoSession, Object) line: 49
+ DefaultIoFilterChain$EntryImpl$1.messageReceived(IoSession, Object) line: 1128
+ DefaultIoFilterChain$HeadFilter(IoFilterAdapter).messageReceived(IoFilter$NextFilter, IoSession, Object) line: 122
+ DefaultIoFilterChain.callNextMessageReceived(IoFilterChain$Entry, IoSession, Object) line: 650
+ DefaultIoFilterChain.fireMessageReceived(Object) line: 643
+ NioProcessor(AbstractPollingIoProcessor<S>).read(S) line: 539
+ AbstractPollingIoProcessor<S>.access$1200(AbstractPollingIoProcessor, AbstractIoSession) line: 68
+ AbstractPollingIoProcessor$Processor.process(S) line: 1223
+ AbstractPollingIoProcessor$Processor.process() line: 1212
+ AbstractPollingIoProcessor$Processor.run() line: 683
+ NamePreservingRunnable.run() line: 64
+ ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149
+ ThreadPoolExecutor$Worker.run() line: 624
+ Thread.run() line: 748
Writing the data is done when we are back in the AbstractPollingIoProcessor$Processor.run() call. Here is the stack for that action :
- ::: text
- NioProcessor.write(NioSession, IoBuffer, int) line: 384
- NioProcessor.write(AbstractIoSession, IoBuffer, int) line: 47
- AbstractPollingIoProcessor$Processor.writeBuffer(S, WriteRequest, boolean, int, long) line: 1107
- AbstractPollingIoProcessor$Processor.flushNow(S, long) line: 994
- AbstractPollingIoProcessor$Processor.flush(long) line: 921
- AbstractPollingIoProcessor$Processor.run() line: 688
- NamePreservingRunnable.run() line: 64
- ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149
- ThreadPoolExecutor$Worker.run() line: 624
- Thread.run() line: 748
+ :::text
+ NioProcessor.write(NioSession, IoBuffer, int) line: 384
+ NioProcessor.write(AbstractIoSession, IoBuffer, int) line: 47
+ AbstractPollingIoProcessor$Processor.writeBuffer(S, WriteRequest, boolean, int, long) line: 1107
+ AbstractPollingIoProcessor$Processor.flushNow(S, long) line: 994
+ AbstractPollingIoProcessor$Processor.flush(long) line: 921
+ AbstractPollingIoProcessor$Processor.run() line: 688
+ NamePreservingRunnable.run() line: 64
+ ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149
+ ThreadPoolExecutor$Worker.run() line: 624
+ Thread.run() line: 748
## The SslFilter implementation
@@ -196,11 +196,11 @@ Bottom line, this is a application issue
That being said, we can use the _IoProcessor_ as a controller. It would b eits responsability to propagate events from one filter to the other, using such a loop:
- ::: text
- for each filter in the chain
- do
- propagate the event to the filter with the previous result
- store the result
- done
+ :::text
+ for each filter in the chain
+ do
+ propagate the event to the filter with the previous result
+ store the result
+ done
What we call the 'result' is just the transformed data. Typically, when calling the _ProtocolCodecFilter.messageReceived_ event, the result will be the decoded message, not the _IoBuffer_ (or whatever) input. This will be passed to the next filter (remember that the _IoFilter.messageReceived_ method takes an _Object_ as a parameter).
\ No newline at end of file