You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mina.apache.org by Luis Neves <lu...@co.sapo.pt> on 2007/07/22 13:42:50 UTC

Messages read while iosession is suspended.

Hello all.
I think I'm misunderstanding something fundamental about how the 
IoSession.suspendRead() and IoSession.resumeRead() methods work and I would 
appreciate any insight you guys can offer.
I have a producer and consumer both based on Mina. What I want to achieve is to 
block the producer while the the consumer is still processing the message.

The way I thought I could do this was do something like this in the producer side:

[...]
WriteFuture wf = ioSession.write(message);
wf.awaitUninterruptibly();
[...]

and in the consumer side:

public void messageReceived(IoSession iosession, Object message) throws Exception
{
long readMsg = iosession.getReadMessages();
iosession.suspendRead();
[... slow message processing ...]
iosession.resumeRead();
}

I was under the impression that the iosession.suspendRead()  would make the 
producer block, but this is not the behaviour I'm seeing.
What I'm seeing is that the even with the IoSession in suspend state the 
producer manages to successfully write the messages and the 
iosession.getReadMessages() in the consumer side keeps getting bigger.
Are my expectations wrong? Is there any way I can achieve the desired behaviour 
with Mina?

I'm using a very recent trunk snapshot.

Thanks.

--
Luis Neves

Re: Messages read while iosession is suspended.

Posted by Trustin Lee <tr...@gmail.com>.
Hi Luis,

I guess it works OK now.  I found your producer code doesn't generate
enough traffic, so I removed sleep(500); from the source code, and
replaced suspendRead and resumeRead calls with
ReadThrottleFilterBuilder.

What's somewhat weird is that the producer doesn't start to increase
its scheduledWriteMessages as soon as the socket receive buffer
becomes full.  I think it is related with the internal implementation
of the TCP/IP stack.

Let me paste my source code here:

package net.gleamynode.tmp;

import java.net.InetSocketAddress;

import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.traffic.ReadThrottleFilterBuilder;
import org.apache.mina.transport.socket.nio.SocketAcceptor;

public class Consumer {
    public static void main(String[] args) throws Exception {
        SocketAcceptor acceptor = new SocketAcceptor();
        acceptor.getFilterChain().addLast(
                "executor",
                new ExecutorFilter());
        acceptor.getFilterChain().addLast(
                "codec",
                new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));

        ReadThrottleFilterBuilder rtfb = new ReadThrottleFilterBuilder();
        rtfb.setMaximumConnectionBufferSize(64);
        rtfb.attach(acceptor.getFilterChain());

        acceptor.setLocalAddress(new InetSocketAddress(7777));
        acceptor.getSessionConfig().setReceiveBufferSize(128);
        acceptor.setHandler(new ConsumerHandler());
        acceptor.bind();
    }
}

----

package net.gleamynode.tmp;

import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;

public class ConsumerHandler extends IoHandlerAdapter {

    public void messageReceived(IoSession session, Object message)
            throws Exception {
        System.out.println("Consumer.getReadMessages(): "
                + session.getReadMessages());
        System.out.println((String) message);
        Thread.sleep(3000);
    }
}

----

package net.gleamynode.tmp;

import java.net.InetSocketAddress;

import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.transport.socket.nio.SocketConnector;

public class Producer {
    public static void main(String[] args) throws Exception {

        SocketConnector connector = new SocketConnector();
        connector.getFilterChain().addLast("codec", new
ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
        connector.setHandler(new IoHandlerAdapter());

        IoSession session = connector.connect(new
InetSocketAddress("127.0.0.1", 7777)).await().getSession();
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            System.out.println("Producer.getScheduledWriteMessages(): "
                    + session.getScheduledWriteMessages());
            String message = "content-" + i;
            System.out.println(message);
            session.write(message).await(500);
            if (session.isClosing()) break;
        }
    }
}

----

HTH,
Trustin
-- 
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP Key ID: 0x0255ECA6

Re: Messages read while iosession is suspended.

Posted by Trustin Lee <tr...@gmail.com>.
I guess you need to use ReadThrottleFilterBuilder in this case.
Unfortunately, I was even unable to fix your problem with
ReadThrottleFilterBuilder.  Let me investigate this issue further this
weekend.

Thanks,
Trustin


On 7/27/07, Luis Neves <lu...@co.sapo.pt> wrote:
>
> Hi Trustin,
>
> Trustin Lee wrote:
> > On 7/24/07, Luis Neves <lu...@co.sapo.pt> wrote:
> >> Trustin Lee wrote:
>
>
> >> humm... perhaps.
> >> I've been experimenting to try to find a way to solve my problem and
> >> I've found
> >> that if the producer sends a bunch of messages and then shuts down the
> >> messageReceived() of the consumer keeps getting called long after the
> >> producer
> >> is dead. It looks like the messages are held in consumer internal
> >> (Mina) Queue,
> >> what I think would also help me in this particular case is the
> >> hability to
> >> specify the maximum number of messages in the consumer Queue....
> >> right? is there
> >> anyway to do that?
> >
> > I created a JIRA issue related with the problem you are describing:
> >
> > https://issues.apache.org/jira/browse/DIRMINA-405
> >
> > I already have checked in the fix.  Please try the trunk and let me
> > know if it does the job for you.  I changed ProtocolCodecFilter not to
> > fire messageReceived event and to hold decoded messages until read is
> > resumed.
>
> As fair as I can tell the fix doesn't seem to solve my specific problem, Mina is
> still accepting Messages into its internal queues after the suspendRead() call.
> suspendRead() is more like suspendNotification().
>
> Using the following code I would have expected that the "content-#" followed
> closely the "Consumer.getReadMessages(): #",  but this is not what happens.
> Also, I expected the the "Producer.getScheduledWriteMessages(): #" to increase
> due to the read suspension of the Consumer, but of course, since the consumer
> accepts the message this doesn't happen.
>
>
> Consumer code:
>
> public void messageReceived(IoSession iosession, Object message) throws Exception
> {
> iosession.suspendRead();
> System.out.println("Consumer.getReadMessages(): " + iosession.getReadMessages());
> iosession.suspendRead();
> System.out.println((String) message);
> Thread.sleep(3000);
> iosession.resumeRead();
> }
>
> Producer code:
>
> for (int i = 0; i < 1000; i++)
> {
> System.out.println("Producer.getScheduledWriteMessages(): " +
> ioSession.getScheduledWriteMessages());
> String message = "content-" + i;
> System.out.println(message);
> ioSession.write(message).awaitUninterruptibly();
> Thread.sleep(500);
> }
>
> This is the ouput from the Consumer:
>
> Consumer.getReadMessages(): 1
> message: content-0
> Consumer.getReadMessages(): 6
> message: content-1
> Consumer.getReadMessages(): 12
> message: content-2
> Consumer.getReadMessages(): 18
> message: content-3
> Consumer.getReadMessages(): 18
> message: content-4
> Consumer.getReadMessages(): 18
> message: content-5
> Consumer.getReadMessages(): 18
> message: content-6
> Consumer.getReadMessages(): 18
> message: content-7
> Consumer.getReadMessages(): 18
> message: content-8
> Consumer.getReadMessages(): 18
> message: content-9
> Consumer.getReadMessages(): 18
> message: content-10
> Consumer.getReadMessages(): 66
> message: content-11
> Consumer.getReadMessages(): 72
> message: content-12
> Consumer.getReadMessages(): 72
> message: content-13
> Consumer.getReadMessages(): 84
> message: content-14
> Consumer.getReadMessages(): 90
> message: content-15
> Consumer.getReadMessages(): 96
> message: content-16
> Consumer.getReadMessages(): 102
> message: content-17
> Consumer.getReadMessages(): 102
> message: content-18
> Consumer.getReadMessages(): 102
> message: content-19
> Consumer.getReadMessages(): 102
> message: content-20
> Consumer.getReadMessages(): 102
> message: content-21
> Consumer.getReadMessages(): 102
> message: content-22
> Consumer.getReadMessages(): 102
> message: content-23
> Consumer.getReadMessages(): 102
> message: content-24
> Consumer.getReadMessages(): 102
> message: content-25
> Consumer.getReadMessages(): 102
> message: content-26
> Consumer.getReadMessages(): 102
> message: content-27
> Consumer.getReadMessages(): 102
> message: content-28
> Consumer.getReadMessages(): 102
> message: content-29
> Consumer.getReadMessages(): 102
> message: content-30
> Consumer.getReadMessages(): 102
> message: content-31
> Consumer.getReadMessages(): 102
> message: content-32
> Consumer.getReadMessages(): 102
> message: content-33
> Consumer.getReadMessages(): 102
> message: content-34
> Consumer.getReadMessages(): 102
> message: content-35
> Consumer.getReadMessages(): 214
> message: content-36
>
>
>
> And this is the output of the Producer:
>
> Producer.getScheduledWriteMessages(): 0
> message: content-0
> Producer.getScheduledWriteMessages(): 0
> message: content-1
> Producer.getScheduledWriteMessages(): 0
> message: content-2
> Producer.getScheduledWriteMessages(): 0
> message: content-3
> Producer.getScheduledWriteMessages(): 0
> message: content-4
> Producer.getScheduledWriteMessages(): 0
> message: content-5
> Producer.getScheduledWriteMessages(): 0
> message: content-6
> Producer.getScheduledWriteMessages(): 0
> message: content-7
> Producer.getScheduledWriteMessages(): 0
> message: content-8
> Producer.getScheduledWriteMessages(): 0
> message: content-9
> Producer.getScheduledWriteMessages(): 0
> message: content-10
> Producer.getScheduledWriteMessages(): 0
> message: content-11
> Producer.getScheduledWriteMessages(): 0
> message: content-12
> Producer.getScheduledWriteMessages(): 0
> message: content-13
> Producer.getScheduledWriteMessages(): 0
> message: content-14
> Producer.getScheduledWriteMessages(): 0
> message: content-15
> Producer.getScheduledWriteMessages(): 0
> message: content-16
> Producer.getScheduledWriteMessages(): 0
> message: content-17
> Producer.getScheduledWriteMessages(): 0
> message: content-18
> Producer.getScheduledWriteMessages(): 0
> message: content-19
> Producer.getScheduledWriteMessages(): 0
> message: content-20
> Producer.getScheduledWriteMessages(): 0
> message: content-21
> Producer.getScheduledWriteMessages(): 0
> message: content-22
> Producer.getScheduledWriteMessages(): 0
> message: content-23
> Producer.getScheduledWriteMessages(): 0
> message: content-24
> Producer.getScheduledWriteMessages(): 0
> message: content-25
> Producer.getScheduledWriteMessages(): 0
> message: content-26
> Producer.getScheduledWriteMessages(): 0
> message: content-27
> Producer.getScheduledWriteMessages(): 0
> message: content-28
> Producer.getScheduledWriteMessages(): 0
> message: content-29
> Producer.getScheduledWriteMessages(): 0
> message: content-30
> Producer.getScheduledWriteMessages(): 0
> message: content-31
> Producer.getScheduledWriteMessages(): 0
> message: content-32
> Producer.getScheduledWriteMessages(): 0
> message: content-33
> Producer.getScheduledWriteMessages(): 0
> message: content-34
> Producer.getScheduledWriteMessages(): 0
> message: content-35
> Producer.getScheduledWriteMessages(): 0
> message: content-36
> Producer.getScheduledWriteMessages(): 0
> message: content-37
> Producer.getScheduledWriteMessages(): 0
> message: content-38
> Producer.getScheduledWriteMessages(): 0
> message: content-39
> Producer.getScheduledWriteMessages(): 0
> message: content-40
> Producer.getScheduledWriteMessages(): 0
> message: content-41
> Producer.getScheduledWriteMessages(): 0
> message: content-42
> Producer.getScheduledWriteMessages(): 0
> message: content-43
> Producer.getScheduledWriteMessages(): 0
> message: content-44
> Producer.getScheduledWriteMessages(): 0
> message: content-45
> Producer.getScheduledWriteMessages(): 0
> message: content-46
> Producer.getScheduledWriteMessages(): 0
> message: content-47
> Producer.getScheduledWriteMessages(): 0
> message: content-48
> Producer.getScheduledWriteMessages(): 0
> message: content-49
> Producer.getScheduledWriteMessages(): 0
> message: content-50
> Producer.getScheduledWriteMessages(): 0
> message: content-51
> Producer.getScheduledWriteMessages(): 0
> message: content-52
> Producer.getScheduledWriteMessages(): 0
> message: content-53
> Producer.getScheduledWriteMessages(): 0
> message: content-54
> Producer.getScheduledWriteMessages(): 0
> message: content-55
> Producer.getScheduledWriteMessages(): 0
> message: content-56
> Producer.getScheduledWriteMessages(): 0
> message: content-57
> Producer.getScheduledWriteMessages(): 0
> message: content-58
> Producer.getScheduledWriteMessages(): 0
> message: content-59
> Producer.getScheduledWriteMessages(): 0
> message: content-60
> Producer.getScheduledWriteMessages(): 0
> message: content-61
> Producer.getScheduledWriteMessages(): 0
> message: content-62
> Producer.getScheduledWriteMessages(): 0
> message: content-63
> Producer.getScheduledWriteMessages(): 0
> message: content-64
> Producer.getScheduledWriteMessages(): 0
> message: content-65
> Producer.getScheduledWriteMessages(): 0
> message: content-66
> Producer.getScheduledWriteMessages(): 0
> message: content-67
> Producer.getScheduledWriteMessages(): 0
> message: content-68
> Producer.getScheduledWriteMessages(): 0
> message: content-69
> Producer.getScheduledWriteMessages(): 0
> message: content-70
> Producer.getScheduledWriteMessages(): 0
> message: content-71
> Producer.getScheduledWriteMessages(): 0
> message: content-72
> Producer.getScheduledWriteMessages(): 0
> message: content-73
> Producer.getScheduledWriteMessages(): 0
> message: content-74
> Producer.getScheduledWriteMessages(): 0
> message: content-75
> Producer.getScheduledWriteMessages(): 0
> message: content-76
> Producer.getScheduledWriteMessages(): 0
> message: content-77
> Producer.getScheduledWriteMessages(): 0
> message: content-78
> Producer.getScheduledWriteMessages(): 0
> message: content-79
> Producer.getScheduledWriteMessages(): 0
> message: content-80
> Producer.getScheduledWriteMessages(): 0
> message: content-81
> Producer.getScheduledWriteMessages(): 0
> message: content-82
> Producer.getScheduledWriteMessages(): 0
> message: content-83
> Producer.getScheduledWriteMessages(): 0
> message: content-84
> Producer.getScheduledWriteMessages(): 0
> message: content-85
> Producer.getScheduledWriteMessages(): 0
> message: content-86
> Producer.getScheduledWriteMessages(): 0
> message: content-87
> Producer.getScheduledWriteMessages(): 0
> message: content-88
> Producer.getScheduledWriteMessages(): 0
> message: content-89
> Producer.getScheduledWriteMessages(): 0
> message: content-90
> Producer.getScheduledWriteMessages(): 0
> message: content-91
> Producer.getScheduledWriteMessages(): 0
> message: content-92
> Producer.getScheduledWriteMessages(): 0
> message: content-93
> Producer.getScheduledWriteMessages(): 0
> message: content-94
> Producer.getScheduledWriteMessages(): 0
> message: content-95
> Producer.getScheduledWriteMessages(): 0
> message: content-96
> Producer.getScheduledWriteMessages(): 0
> message: content-97
> Producer.getScheduledWriteMessages(): 0
> message: content-98
> Producer.getScheduledWriteMessages(): 0
> message: content-99
> Producer.getScheduledWriteMessages(): 0
> message: content-100
> Producer.getScheduledWriteMessages(): 0
> message: content-101
> Producer.getScheduledWriteMessages(): 0
> message: content-102
> Producer.getScheduledWriteMessages(): 0
> message: content-103
> Producer.getScheduledWriteMessages(): 0
> message: content-104
> Producer.getScheduledWriteMessages(): 0
> message: content-105
> Producer.getScheduledWriteMessages(): 0
> message: content-106
> Producer.getScheduledWriteMessages(): 0
> message: content-107
> Producer.getScheduledWriteMessages(): 0
> message: content-108
> Producer.getScheduledWriteMessages(): 0
> message: content-109
> Producer.getScheduledWriteMessages(): 0
> message: content-110
> Producer.getScheduledWriteMessages(): 0
> message: content-111
> Producer.getScheduledWriteMessages(): 0
> message: content-112
> Producer.getScheduledWriteMessages(): 0
> message: content-113
> Producer.getScheduledWriteMessages(): 0
> message: content-114
> Producer.getScheduledWriteMessages(): 0
> message: content-115
> Producer.getScheduledWriteMessages(): 0
> message: content-116
> Producer.getScheduledWriteMessages(): 0
> message: content-117
> Producer.getScheduledWriteMessages(): 0
> message: content-118
> Producer.getScheduledWriteMessages(): 0
> message: content-119
> Producer.getScheduledWriteMessages(): 0
> message: content-120
> Producer.getScheduledWriteMessages(): 0
> message: content-121
> Producer.getScheduledWriteMessages(): 0
> message: content-122
> Producer.getScheduledWriteMessages(): 0
> message: content-123
> Producer.getScheduledWriteMessages(): 0
> message: content-124
> Producer.getScheduledWriteMessages(): 0
> message: content-125
> Producer.getScheduledWriteMessages(): 0
> message: content-126
> Producer.getScheduledWriteMessages(): 0
> message: content-127
> Producer.getScheduledWriteMessages(): 0
> message: content-128
> Producer.getScheduledWriteMessages(): 0
> message: content-129
> Producer.getScheduledWriteMessages(): 0
> message: content-130
> Producer.getScheduledWriteMessages(): 0
> message: content-131
> Producer.getScheduledWriteMessages(): 0
> message: content-132
> Producer.getScheduledWriteMessages(): 0
> message: content-133
> Producer.getScheduledWriteMessages(): 0
> message: content-134
> Producer.getScheduledWriteMessages(): 0
> message: content-135
> Producer.getScheduledWriteMessages(): 0
> message: content-136
> Producer.getScheduledWriteMessages(): 0
> message: content-137
> Producer.getScheduledWriteMessages(): 0
> message: content-138
> Producer.getScheduledWriteMessages(): 0
> message: content-139
> Producer.getScheduledWriteMessages(): 0
> message: content-140
> Producer.getScheduledWriteMessages(): 0
> message: content-141
> Producer.getScheduledWriteMessages(): 0
> message: content-142
> Producer.getScheduledWriteMessages(): 0
> message: content-143
> Producer.getScheduledWriteMessages(): 0
> message: content-144
> Producer.getScheduledWriteMessages(): 0
> message: content-145
> Producer.getScheduledWriteMessages(): 0
> message: content-146
> Producer.getScheduledWriteMessages(): 0
> message: content-147
> Producer.getScheduledWriteMessages(): 0
> message: content-148
> Producer.getScheduledWriteMessages(): 0
> message: content-149
> Producer.getScheduledWriteMessages(): 0
> message: content-150
> Producer.getScheduledWriteMessages(): 0
> message: content-151
> Producer.getScheduledWriteMessages(): 0
> message: content-152
> Producer.getScheduledWriteMessages(): 0
> message: content-153
> Producer.getScheduledWriteMessages(): 0
> message: content-154
> Producer.getScheduledWriteMessages(): 0
> message: content-155
> Producer.getScheduledWriteMessages(): 0
> message: content-156
> Producer.getScheduledWriteMessages(): 0
> message: content-157
> Producer.getScheduledWriteMessages(): 0
> message: content-158
> Producer.getScheduledWriteMessages(): 0
> message: content-159
> Producer.getScheduledWriteMessages(): 0
> message: content-160
> Producer.getScheduledWriteMessages(): 0
> message: content-161
> Producer.getScheduledWriteMessages(): 0
> message: content-162
> Producer.getScheduledWriteMessages(): 0
> message: content-163
> Producer.getScheduledWriteMessages(): 0
> message: content-164
> Producer.getScheduledWriteMessages(): 0
> message: content-165
> Producer.getScheduledWriteMessages(): 0
> message: content-166
> Producer.getScheduledWriteMessages(): 0
> message: content-167
> Producer.getScheduledWriteMessages(): 0
> message: content-168
> Producer.getScheduledWriteMessages(): 0
> message: content-169
> Producer.getScheduledWriteMessages(): 0
> message: content-170
> Producer.getScheduledWriteMessages(): 0
> message: content-171
> Producer.getScheduledWriteMessages(): 0
> message: content-172
> Producer.getScheduledWriteMessages(): 0
> message: content-173
> Producer.getScheduledWriteMessages(): 0
> message: content-174
> Producer.getScheduledWriteMessages(): 0
> message: content-175
> Producer.getScheduledWriteMessages(): 0
> message: content-176
> Producer.getScheduledWriteMessages(): 0
> message: content-177
> Producer.getScheduledWriteMessages(): 0
> message: content-178
> Producer.getScheduledWriteMessages(): 0
> message: content-179
> Producer.getScheduledWriteMessages(): 0
> message: content-180
> Producer.getScheduledWriteMessages(): 0
> message: content-181
> Producer.getScheduledWriteMessages(): 0
> message: content-182
> Producer.getScheduledWriteMessages(): 0
> message: content-183
> Producer.getScheduledWriteMessages(): 0
> message: content-184
> Producer.getScheduledWriteMessages(): 0
> message: content-185
> Producer.getScheduledWriteMessages(): 0
> message: content-186
> Producer.getScheduledWriteMessages(): 0
> message: content-187
> Producer.getScheduledWriteMessages(): 0
> message: content-188
> Producer.getScheduledWriteMessages(): 0
> message: content-189
> Producer.getScheduledWriteMessages(): 0
> message: content-190
> Producer.getScheduledWriteMessages(): 0
> message: content-191
> Producer.getScheduledWriteMessages(): 0
> message: content-192
> Producer.getScheduledWriteMessages(): 0
> message: content-193
> Producer.getScheduledWriteMessages(): 0
> message: content-194
> Producer.getScheduledWriteMessages(): 0
> message: content-195
> Producer.getScheduledWriteMessages(): 0
> message: content-196
> Producer.getScheduledWriteMessages(): 0
> message: content-197
> Producer.getScheduledWriteMessages(): 0
> message: content-198
> Producer.getScheduledWriteMessages(): 0
> message: content-199
> Producer.getScheduledWriteMessages(): 0
> message: content-200
> Producer.getScheduledWriteMessages(): 0
> message: content-201
> Producer.getScheduledWriteMessages(): 0
> message: content-202
> Producer.getScheduledWriteMessages(): 0
> message: content-203
> Producer.getScheduledWriteMessages(): 0
> message: content-204
> Producer.getScheduledWriteMessages(): 0
> message: content-205
> Producer.getScheduledWriteMessages(): 0
> message: content-206
> Producer.getScheduledWriteMessages(): 0
> message: content-207
> Producer.getScheduledWriteMessages(): 0
> message: content-208
> Producer.getScheduledWriteMessages(): 0
> message: content-209
> Producer.getScheduledWriteMessages(): 0
> message: content-210
> Producer.getScheduledWriteMessages(): 0
> message: content-211
> Producer.getScheduledWriteMessages(): 0
> message: content-212
> Producer.getScheduledWriteMessages(): 0
> message: content-213
> Producer.getScheduledWriteMessages(): 0
> message: content-214
>
>
> --
> Luis Neves
>
>


-- 
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP Key ID: 0x0255ECA6

Re: Messages read while iosession is suspended.

Posted by Luis Neves <lu...@co.sapo.pt>.
Hi Trustin,

Trustin Lee wrote:
> On 7/24/07, Luis Neves <lu...@co.sapo.pt> wrote:
>> Trustin Lee wrote:


>> humm... perhaps.
>> I've been experimenting to try to find a way to solve my problem and 
>> I've found
>> that if the producer sends a bunch of messages and then shuts down the
>> messageReceived() of the consumer keeps getting called long after the 
>> producer
>> is dead. It looks like the messages are held in consumer internal 
>> (Mina) Queue,
>> what I think would also help me in this particular case is the 
>> hability to
>> specify the maximum number of messages in the consumer Queue.... 
>> right? is there
>> anyway to do that?
> 
> I created a JIRA issue related with the problem you are describing:
> 
> https://issues.apache.org/jira/browse/DIRMINA-405
> 
> I already have checked in the fix.  Please try the trunk and let me
> know if it does the job for you.  I changed ProtocolCodecFilter not to
> fire messageReceived event and to hold decoded messages until read is
> resumed.

As fair as I can tell the fix doesn't seem to solve my specific problem, Mina is 
still accepting Messages into its internal queues after the suspendRead() call.
suspendRead() is more like suspendNotification().

Using the following code I would have expected that the "content-#" followed 
closely the "Consumer.getReadMessages(): #",  but this is not what happens.
Also, I expected the the "Producer.getScheduledWriteMessages(): #" to increase 
due to the read suspension of the Consumer, but of course, since the consumer 
accepts the message this doesn't happen.


Consumer code:

public void messageReceived(IoSession iosession, Object message) throws Exception
{
iosession.suspendRead();
System.out.println("Consumer.getReadMessages(): " + iosession.getReadMessages());
iosession.suspendRead();
System.out.println((String) message);
Thread.sleep(3000);
iosession.resumeRead();
}

Producer code:

for (int i = 0; i < 1000; i++)
{
System.out.println("Producer.getScheduledWriteMessages(): " + 
ioSession.getScheduledWriteMessages());
String message = "content-" + i;
System.out.println(message);
ioSession.write(message).awaitUninterruptibly();
Thread.sleep(500);
}

This is the ouput from the Consumer:

Consumer.getReadMessages(): 1
message: content-0
Consumer.getReadMessages(): 6
message: content-1
Consumer.getReadMessages(): 12
message: content-2
Consumer.getReadMessages(): 18
message: content-3
Consumer.getReadMessages(): 18
message: content-4
Consumer.getReadMessages(): 18
message: content-5
Consumer.getReadMessages(): 18
message: content-6
Consumer.getReadMessages(): 18
message: content-7
Consumer.getReadMessages(): 18
message: content-8
Consumer.getReadMessages(): 18
message: content-9
Consumer.getReadMessages(): 18
message: content-10
Consumer.getReadMessages(): 66
message: content-11
Consumer.getReadMessages(): 72
message: content-12
Consumer.getReadMessages(): 72
message: content-13
Consumer.getReadMessages(): 84
message: content-14
Consumer.getReadMessages(): 90
message: content-15
Consumer.getReadMessages(): 96
message: content-16
Consumer.getReadMessages(): 102
message: content-17
Consumer.getReadMessages(): 102
message: content-18
Consumer.getReadMessages(): 102
message: content-19
Consumer.getReadMessages(): 102
message: content-20
Consumer.getReadMessages(): 102
message: content-21
Consumer.getReadMessages(): 102
message: content-22
Consumer.getReadMessages(): 102
message: content-23
Consumer.getReadMessages(): 102
message: content-24
Consumer.getReadMessages(): 102
message: content-25
Consumer.getReadMessages(): 102
message: content-26
Consumer.getReadMessages(): 102
message: content-27
Consumer.getReadMessages(): 102
message: content-28
Consumer.getReadMessages(): 102
message: content-29
Consumer.getReadMessages(): 102
message: content-30
Consumer.getReadMessages(): 102
message: content-31
Consumer.getReadMessages(): 102
message: content-32
Consumer.getReadMessages(): 102
message: content-33
Consumer.getReadMessages(): 102
message: content-34
Consumer.getReadMessages(): 102
message: content-35
Consumer.getReadMessages(): 214
message: content-36



And this is the output of the Producer:

Producer.getScheduledWriteMessages(): 0
message: content-0
Producer.getScheduledWriteMessages(): 0
message: content-1
Producer.getScheduledWriteMessages(): 0
message: content-2
Producer.getScheduledWriteMessages(): 0
message: content-3
Producer.getScheduledWriteMessages(): 0
message: content-4
Producer.getScheduledWriteMessages(): 0
message: content-5
Producer.getScheduledWriteMessages(): 0
message: content-6
Producer.getScheduledWriteMessages(): 0
message: content-7
Producer.getScheduledWriteMessages(): 0
message: content-8
Producer.getScheduledWriteMessages(): 0
message: content-9
Producer.getScheduledWriteMessages(): 0
message: content-10
Producer.getScheduledWriteMessages(): 0
message: content-11
Producer.getScheduledWriteMessages(): 0
message: content-12
Producer.getScheduledWriteMessages(): 0
message: content-13
Producer.getScheduledWriteMessages(): 0
message: content-14
Producer.getScheduledWriteMessages(): 0
message: content-15
Producer.getScheduledWriteMessages(): 0
message: content-16
Producer.getScheduledWriteMessages(): 0
message: content-17
Producer.getScheduledWriteMessages(): 0
message: content-18
Producer.getScheduledWriteMessages(): 0
message: content-19
Producer.getScheduledWriteMessages(): 0
message: content-20
Producer.getScheduledWriteMessages(): 0
message: content-21
Producer.getScheduledWriteMessages(): 0
message: content-22
Producer.getScheduledWriteMessages(): 0
message: content-23
Producer.getScheduledWriteMessages(): 0
message: content-24
Producer.getScheduledWriteMessages(): 0
message: content-25
Producer.getScheduledWriteMessages(): 0
message: content-26
Producer.getScheduledWriteMessages(): 0
message: content-27
Producer.getScheduledWriteMessages(): 0
message: content-28
Producer.getScheduledWriteMessages(): 0
message: content-29
Producer.getScheduledWriteMessages(): 0
message: content-30
Producer.getScheduledWriteMessages(): 0
message: content-31
Producer.getScheduledWriteMessages(): 0
message: content-32
Producer.getScheduledWriteMessages(): 0
message: content-33
Producer.getScheduledWriteMessages(): 0
message: content-34
Producer.getScheduledWriteMessages(): 0
message: content-35
Producer.getScheduledWriteMessages(): 0
message: content-36
Producer.getScheduledWriteMessages(): 0
message: content-37
Producer.getScheduledWriteMessages(): 0
message: content-38
Producer.getScheduledWriteMessages(): 0
message: content-39
Producer.getScheduledWriteMessages(): 0
message: content-40
Producer.getScheduledWriteMessages(): 0
message: content-41
Producer.getScheduledWriteMessages(): 0
message: content-42
Producer.getScheduledWriteMessages(): 0
message: content-43
Producer.getScheduledWriteMessages(): 0
message: content-44
Producer.getScheduledWriteMessages(): 0
message: content-45
Producer.getScheduledWriteMessages(): 0
message: content-46
Producer.getScheduledWriteMessages(): 0
message: content-47
Producer.getScheduledWriteMessages(): 0
message: content-48
Producer.getScheduledWriteMessages(): 0
message: content-49
Producer.getScheduledWriteMessages(): 0
message: content-50
Producer.getScheduledWriteMessages(): 0
message: content-51
Producer.getScheduledWriteMessages(): 0
message: content-52
Producer.getScheduledWriteMessages(): 0
message: content-53
Producer.getScheduledWriteMessages(): 0
message: content-54
Producer.getScheduledWriteMessages(): 0
message: content-55
Producer.getScheduledWriteMessages(): 0
message: content-56
Producer.getScheduledWriteMessages(): 0
message: content-57
Producer.getScheduledWriteMessages(): 0
message: content-58
Producer.getScheduledWriteMessages(): 0
message: content-59
Producer.getScheduledWriteMessages(): 0
message: content-60
Producer.getScheduledWriteMessages(): 0
message: content-61
Producer.getScheduledWriteMessages(): 0
message: content-62
Producer.getScheduledWriteMessages(): 0
message: content-63
Producer.getScheduledWriteMessages(): 0
message: content-64
Producer.getScheduledWriteMessages(): 0
message: content-65
Producer.getScheduledWriteMessages(): 0
message: content-66
Producer.getScheduledWriteMessages(): 0
message: content-67
Producer.getScheduledWriteMessages(): 0
message: content-68
Producer.getScheduledWriteMessages(): 0
message: content-69
Producer.getScheduledWriteMessages(): 0
message: content-70
Producer.getScheduledWriteMessages(): 0
message: content-71
Producer.getScheduledWriteMessages(): 0
message: content-72
Producer.getScheduledWriteMessages(): 0
message: content-73
Producer.getScheduledWriteMessages(): 0
message: content-74
Producer.getScheduledWriteMessages(): 0
message: content-75
Producer.getScheduledWriteMessages(): 0
message: content-76
Producer.getScheduledWriteMessages(): 0
message: content-77
Producer.getScheduledWriteMessages(): 0
message: content-78
Producer.getScheduledWriteMessages(): 0
message: content-79
Producer.getScheduledWriteMessages(): 0
message: content-80
Producer.getScheduledWriteMessages(): 0
message: content-81
Producer.getScheduledWriteMessages(): 0
message: content-82
Producer.getScheduledWriteMessages(): 0
message: content-83
Producer.getScheduledWriteMessages(): 0
message: content-84
Producer.getScheduledWriteMessages(): 0
message: content-85
Producer.getScheduledWriteMessages(): 0
message: content-86
Producer.getScheduledWriteMessages(): 0
message: content-87
Producer.getScheduledWriteMessages(): 0
message: content-88
Producer.getScheduledWriteMessages(): 0
message: content-89
Producer.getScheduledWriteMessages(): 0
message: content-90
Producer.getScheduledWriteMessages(): 0
message: content-91
Producer.getScheduledWriteMessages(): 0
message: content-92
Producer.getScheduledWriteMessages(): 0
message: content-93
Producer.getScheduledWriteMessages(): 0
message: content-94
Producer.getScheduledWriteMessages(): 0
message: content-95
Producer.getScheduledWriteMessages(): 0
message: content-96
Producer.getScheduledWriteMessages(): 0
message: content-97
Producer.getScheduledWriteMessages(): 0
message: content-98
Producer.getScheduledWriteMessages(): 0
message: content-99
Producer.getScheduledWriteMessages(): 0
message: content-100
Producer.getScheduledWriteMessages(): 0
message: content-101
Producer.getScheduledWriteMessages(): 0
message: content-102
Producer.getScheduledWriteMessages(): 0
message: content-103
Producer.getScheduledWriteMessages(): 0
message: content-104
Producer.getScheduledWriteMessages(): 0
message: content-105
Producer.getScheduledWriteMessages(): 0
message: content-106
Producer.getScheduledWriteMessages(): 0
message: content-107
Producer.getScheduledWriteMessages(): 0
message: content-108
Producer.getScheduledWriteMessages(): 0
message: content-109
Producer.getScheduledWriteMessages(): 0
message: content-110
Producer.getScheduledWriteMessages(): 0
message: content-111
Producer.getScheduledWriteMessages(): 0
message: content-112
Producer.getScheduledWriteMessages(): 0
message: content-113
Producer.getScheduledWriteMessages(): 0
message: content-114
Producer.getScheduledWriteMessages(): 0
message: content-115
Producer.getScheduledWriteMessages(): 0
message: content-116
Producer.getScheduledWriteMessages(): 0
message: content-117
Producer.getScheduledWriteMessages(): 0
message: content-118
Producer.getScheduledWriteMessages(): 0
message: content-119
Producer.getScheduledWriteMessages(): 0
message: content-120
Producer.getScheduledWriteMessages(): 0
message: content-121
Producer.getScheduledWriteMessages(): 0
message: content-122
Producer.getScheduledWriteMessages(): 0
message: content-123
Producer.getScheduledWriteMessages(): 0
message: content-124
Producer.getScheduledWriteMessages(): 0
message: content-125
Producer.getScheduledWriteMessages(): 0
message: content-126
Producer.getScheduledWriteMessages(): 0
message: content-127
Producer.getScheduledWriteMessages(): 0
message: content-128
Producer.getScheduledWriteMessages(): 0
message: content-129
Producer.getScheduledWriteMessages(): 0
message: content-130
Producer.getScheduledWriteMessages(): 0
message: content-131
Producer.getScheduledWriteMessages(): 0
message: content-132
Producer.getScheduledWriteMessages(): 0
message: content-133
Producer.getScheduledWriteMessages(): 0
message: content-134
Producer.getScheduledWriteMessages(): 0
message: content-135
Producer.getScheduledWriteMessages(): 0
message: content-136
Producer.getScheduledWriteMessages(): 0
message: content-137
Producer.getScheduledWriteMessages(): 0
message: content-138
Producer.getScheduledWriteMessages(): 0
message: content-139
Producer.getScheduledWriteMessages(): 0
message: content-140
Producer.getScheduledWriteMessages(): 0
message: content-141
Producer.getScheduledWriteMessages(): 0
message: content-142
Producer.getScheduledWriteMessages(): 0
message: content-143
Producer.getScheduledWriteMessages(): 0
message: content-144
Producer.getScheduledWriteMessages(): 0
message: content-145
Producer.getScheduledWriteMessages(): 0
message: content-146
Producer.getScheduledWriteMessages(): 0
message: content-147
Producer.getScheduledWriteMessages(): 0
message: content-148
Producer.getScheduledWriteMessages(): 0
message: content-149
Producer.getScheduledWriteMessages(): 0
message: content-150
Producer.getScheduledWriteMessages(): 0
message: content-151
Producer.getScheduledWriteMessages(): 0
message: content-152
Producer.getScheduledWriteMessages(): 0
message: content-153
Producer.getScheduledWriteMessages(): 0
message: content-154
Producer.getScheduledWriteMessages(): 0
message: content-155
Producer.getScheduledWriteMessages(): 0
message: content-156
Producer.getScheduledWriteMessages(): 0
message: content-157
Producer.getScheduledWriteMessages(): 0
message: content-158
Producer.getScheduledWriteMessages(): 0
message: content-159
Producer.getScheduledWriteMessages(): 0
message: content-160
Producer.getScheduledWriteMessages(): 0
message: content-161
Producer.getScheduledWriteMessages(): 0
message: content-162
Producer.getScheduledWriteMessages(): 0
message: content-163
Producer.getScheduledWriteMessages(): 0
message: content-164
Producer.getScheduledWriteMessages(): 0
message: content-165
Producer.getScheduledWriteMessages(): 0
message: content-166
Producer.getScheduledWriteMessages(): 0
message: content-167
Producer.getScheduledWriteMessages(): 0
message: content-168
Producer.getScheduledWriteMessages(): 0
message: content-169
Producer.getScheduledWriteMessages(): 0
message: content-170
Producer.getScheduledWriteMessages(): 0
message: content-171
Producer.getScheduledWriteMessages(): 0
message: content-172
Producer.getScheduledWriteMessages(): 0
message: content-173
Producer.getScheduledWriteMessages(): 0
message: content-174
Producer.getScheduledWriteMessages(): 0
message: content-175
Producer.getScheduledWriteMessages(): 0
message: content-176
Producer.getScheduledWriteMessages(): 0
message: content-177
Producer.getScheduledWriteMessages(): 0
message: content-178
Producer.getScheduledWriteMessages(): 0
message: content-179
Producer.getScheduledWriteMessages(): 0
message: content-180
Producer.getScheduledWriteMessages(): 0
message: content-181
Producer.getScheduledWriteMessages(): 0
message: content-182
Producer.getScheduledWriteMessages(): 0
message: content-183
Producer.getScheduledWriteMessages(): 0
message: content-184
Producer.getScheduledWriteMessages(): 0
message: content-185
Producer.getScheduledWriteMessages(): 0
message: content-186
Producer.getScheduledWriteMessages(): 0
message: content-187
Producer.getScheduledWriteMessages(): 0
message: content-188
Producer.getScheduledWriteMessages(): 0
message: content-189
Producer.getScheduledWriteMessages(): 0
message: content-190
Producer.getScheduledWriteMessages(): 0
message: content-191
Producer.getScheduledWriteMessages(): 0
message: content-192
Producer.getScheduledWriteMessages(): 0
message: content-193
Producer.getScheduledWriteMessages(): 0
message: content-194
Producer.getScheduledWriteMessages(): 0
message: content-195
Producer.getScheduledWriteMessages(): 0
message: content-196
Producer.getScheduledWriteMessages(): 0
message: content-197
Producer.getScheduledWriteMessages(): 0
message: content-198
Producer.getScheduledWriteMessages(): 0
message: content-199
Producer.getScheduledWriteMessages(): 0
message: content-200
Producer.getScheduledWriteMessages(): 0
message: content-201
Producer.getScheduledWriteMessages(): 0
message: content-202
Producer.getScheduledWriteMessages(): 0
message: content-203
Producer.getScheduledWriteMessages(): 0
message: content-204
Producer.getScheduledWriteMessages(): 0
message: content-205
Producer.getScheduledWriteMessages(): 0
message: content-206
Producer.getScheduledWriteMessages(): 0
message: content-207
Producer.getScheduledWriteMessages(): 0
message: content-208
Producer.getScheduledWriteMessages(): 0
message: content-209
Producer.getScheduledWriteMessages(): 0
message: content-210
Producer.getScheduledWriteMessages(): 0
message: content-211
Producer.getScheduledWriteMessages(): 0
message: content-212
Producer.getScheduledWriteMessages(): 0
message: content-213
Producer.getScheduledWriteMessages(): 0
message: content-214


--
Luis Neves


Re: Messages read while iosession is suspended.

Posted by Trustin Lee <tr...@gmail.com>.
On 7/24/07, Luis Neves <lu...@co.sapo.pt> wrote:
> Trustin Lee wrote:
>
>
> > suspend/resumeRead() is an asynchronous operation.  It means message
> > can be received *after* suspendRead() is called.  However, once
> > suspension request is processed by SocketIoProcessor, no more message
> > will be received.
> >
> > Do we need TrafficFuture or something similar for more fine-grained
> > control?
>
> humm... perhaps.
> I've been experimenting to try to find a way to solve my problem and I've found
> that if the producer sends a bunch of messages and then shuts down the
> messageReceived() of the consumer keeps getting called long after the producer
> is dead. It looks like the messages are held in consumer internal (Mina) Queue,
> what I think would also help me in this particular case is the hability to
> specify the maximum number of messages in the consumer Queue.... right? is there
> anyway to do that?

I created a JIRA issue related with the problem you are describing:

https://issues.apache.org/jira/browse/DIRMINA-405

I already have checked in the fix.  Please try the trunk and let me
know if it does the job for you.  I changed ProtocolCodecFilter not to
fire messageReceived event and to hold decoded messages until read is
resumed.

Trustin
-- 
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP Key ID: 0x0255ECA6

Re: Messages read while iosession is suspended.

Posted by Luis Neves <lu...@co.sapo.pt>.
Trustin Lee wrote:


> suspend/resumeRead() is an asynchronous operation.  It means message
> can be received *after* suspendRead() is called.  However, once
> suspension request is processed by SocketIoProcessor, no more message
> will be received.
> 
> Do we need TrafficFuture or something similar for more fine-grained 
> control?

humm... perhaps.
I've been experimenting to try to find a way to solve my problem and I've found 
that if the producer sends a bunch of messages and then shuts down the 
messageReceived() of the consumer keeps getting called long after the producer 
is dead. It looks like the messages are held in consumer internal (Mina) Queue, 
what I think would also help me in this particular case is the hability to 
specify the maximum number of messages in the consumer Queue.... right? is there 
anyway to do that?

--
Luis Neves


Re: Messages read while iosession is suspended.

Posted by Luis Neves <lu...@co.sapo.pt>.
Ian Cass wrote:
> Luis Neves wrote:

>> If all else fails I will have to figure out a way of doing this at
>> the protocol level. 
> 
> Surely all you need is sensible protocol level flow control?

It's the "sensible" part that I'm having trouble with :-)

> Do this:-
> 
> Get your sender to send a small number of simultaneous requests (your flow
> control window) - say 5. Then make the sender stop sending until it's got a
> response, keeping 5 outstanding requests on the wire. You should then
> implement a similar setup on the consumer just to handle badly behaving
> clients, reading total <window size> number of msgs before you suspend reads
> and resuming reads when you send a response.

Ok... but there is a big difference between:
- not delivering a message.
- delivering a message but for some reason don't receive the ack.

The reason I mention this is because I'm experimenting with something like you 
suggest and I'm having doubts on how to deal with producers that send a batch of 
messages and are terminated before receiving the ack... when the producer comes 
up it tries to redeliver the messages and the result is duplicate messages in 
the consumer end... how do you deal with this problem?
I've not been able to find a simple solution for this, it's getting complex and 
I don't have a lot of confidence in the code.

It sounds much simpler just to block the producer if the consumer suspend the 
reads... but maybe this isn't possible.

Thanks!

--
Luis Neves



RE: Messages read while iosession is suspended.

Posted by Ian Cass <ia...@mblox.com>.
Luis Neves wrote:
 
>> It is my understanding that MINA will stop requesting incoming data
>> from the OS when you call suspendRead.
>> Eventually, the OS'es buffer of incoming data will be full and TCP
>> will ask the other side to stop writing.
> 
> Yeah... my problem is with the "eventually" part, I really need a
> suspendReadNow(). 
> 
> If all else fails I will have to figure out a way of doing this at
> the protocol level. 

Surely all you need is sensible protocol level flow control?

Do this:-

Get your sender to send a small number of simultaneous requests (your flow
control window) - say 5. Then make the sender stop sending until it's got a
response, keeping 5 outstanding requests on the wire. You should then
implement a similar setup on the consumer just to handle badly behaving
clients, reading total <window size> number of msgs before you suspend reads
and resuming reads when you send a response.

Only once you have proper flow control can you start to throttle, otherwise
you'll get flooded with msgs & end up with out of memory condition. If you
rely on tcp buffers for flow control then be prepared for a huge amount of
data being buffered. Far better to do it at protocol level.

I've been experimenting with throttling by queueing a runnable (in a
scheduled thread pool executor) that free's a window slot on the flow
control window on the server after x milliseconds & resumes reads. It seems
to work ok. Tested up to 12k msgs/sec.

-- 
Ian Cass

Re: Messages read while iosession is suspended.

Posted by Trustin Lee <tr...@gmail.com>.
On 7/25/07, Maarten Bosteels <mb...@gmail.com> wrote:
> On 7/25/07, Trustin Lee <tr...@gmail.com> wrote:
> >
> > Hi Luis,
> >
> > On 7/24/07, Luis Neves <lu...@co.sapo.pt> wrote:
> > > Yeah... my problem is with the "eventually" part, I really need a
> > suspendReadNow().
> >
> > I see.  Then why don't you create a JIRA issue?  We could add
> > TrafficControlFuture or something similar to trunk.  Once implemented,
> > you could do like the following:
> >
> > session.suspendRead().await();
>
>
> Trustin,
>
> Although a TrafficControlFuture might be useful, it won't help in this use
> case: he wants to block the producer who
> sits at the other side of the wire. I think best way to go is to do this at
> the protocol level, as he mentioned.

You are right.  I guess we can take care of this issue by calling
traffic control methods *before* ExecutorFilter in the filter chain.
It will fragment the protocol logic though.

SocketIoProcessor could check if the traffic mask has changed after
each read, and stop reading the current channel for precise traffic
control.

WDYT?

Trustin
-- 
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP Key ID: 0x0255ECA6

Re: Messages read while iosession is suspended.

Posted by Maarten Bosteels <mb...@gmail.com>.
On 7/25/07, Trustin Lee <tr...@gmail.com> wrote:
>
> Hi Luis,
>
> On 7/24/07, Luis Neves <lu...@co.sapo.pt> wrote:
> > Yeah... my problem is with the "eventually" part, I really need a
> suspendReadNow().
>
> I see.  Then why don't you create a JIRA issue?  We could add
> TrafficControlFuture or something similar to trunk.  Once implemented,
> you could do like the following:
>
> session.suspendRead().await();


Trustin,

Although a TrafficControlFuture might be useful, it won't help in this use
case: he wants to block the producer who
sits at the other side of the wire. I think best way to go is to do this at
the protocol level, as he mentioned.

Maarten

Thanks for the input,
> Trustin
> --
> what we call human nature is actually human habit
> --
> http://gleamynode.net/
> --
> PGP Key ID: 0x0255ECA6
>

Re: Messages read while iosession is suspended.

Posted by Trustin Lee <tr...@gmail.com>.
Hi Luis,

On 7/24/07, Luis Neves <lu...@co.sapo.pt> wrote:
> Yeah... my problem is with the "eventually" part, I really need a suspendReadNow().

I see.  Then why don't you create a JIRA issue?  We could add
TrafficControlFuture or something similar to trunk.  Once implemented,
you could do like the following:

session.suspendRead().await();

Thanks for the input,
Trustin
-- 
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP Key ID: 0x0255ECA6

Re: Messages read while iosession is suspended.

Posted by Luis Neves <lu...@co.sapo.pt>.
Maarten Bosteels wrote:
> Hi Luis,
> 
> Are your producer and consumer two processes that use MINA to communicate ?
> So you are calling ioSession.suspendRead in process A (the consumer end)
> in the hope that this would block the producer in process B (at the other
> end of the wire), right ?

Yes, that's it exactly.

> I guess this would work, if you are using TCP:
> 
> It is my understanding that MINA will stop requesting incoming data from 
> the
> OS when you call suspendRead.
> Eventually, the OS'es buffer of incoming data will be full and TCP will ask
> the other side to stop writing.

Yeah... my problem is with the "eventually" part, I really need a suspendReadNow().

If all else fails I will have to figure out a way of doing this at the protocol 
level.


Maarten, Trustin ... thanks!

--
Luis Neves

Re: Messages read while iosession is suspended.

Posted by Maarten Bosteels <mb...@gmail.com>.
Hi Luis,

Are your producer and consumer two processes that use MINA to communicate ?
So you are calling ioSession.suspendRead in process A (the consumer end)
in the hope that this would block the producer in process B (at the other
end of the wire), right ?

I guess this would work, if you are using TCP:

It is my understanding that MINA will stop requesting incoming data from the
OS when you call suspendRead.
Eventually, the OS'es buffer of incoming data will be full and TCP will ask
the other side to stop writing.

from
http://en.wikipedia.org/wiki/Transmission_Control_Protocol#TCP_window_size

"The TCP receive window size is the amount of received data (in bytes) that
can be buffered during a connection. The sending host can send only up to
that amount of data before it must wait for an acknowledgment and window
update from the receiving host. When a receiver advertises the window size
of 0, the sender stops sending data and starts the persist timer."

Maarten

On 7/24/07, Trustin Lee <tr...@gmail.com> wrote:
>
> Hi Luis,
>
> On 7/22/07, Luis Neves <lu...@co.sapo.pt> wrote:
> >
> > Hello all.
> > I think I'm misunderstanding something fundamental about how the
> > IoSession.suspendRead() and IoSession.resumeRead() methods work and I
> would
> > appreciate any insight you guys can offer.
> > I have a producer and consumer both based on Mina. What I want to
> achieve is to
> > block the producer while the the consumer is still processing the
> message.
> >
> > The way I thought I could do this was do something like this in the
> producer side:
> >
> > [...]
> > WriteFuture wf = ioSession.write(message);
> > wf.awaitUninterruptibly();
> > [...]
> >
> > and in the consumer side:
> >
> > public void messageReceived(IoSession iosession, Object message) throws
> Exception
> > {
> > long readMsg = iosession.getReadMessages();
> > iosession.suspendRead();
> > [... slow message processing ...]
> > iosession.resumeRead();
> > }
> >
> > I was under the impression that the iosession.suspendRead()  would make
> the
> > producer block, but this is not the behaviour I'm seeing.
> > What I'm seeing is that the even with the IoSession in suspend state the
> > producer manages to successfully write the messages and the
> > iosession.getReadMessages() in the consumer side keeps getting bigger.
> > Are my expectations wrong? Is there any way I can achieve the desired
> behaviour
> > with Mina?
>
> suspend/resumeRead() is an asynchronous operation.  It means message
> can be received *after* suspendRead() is called.  However, once
> suspension request is processed by SocketIoProcessor, no more message
> will be received.
>
> Do we need TrafficFuture or something similar for more fine-grained
> control?
>
> Trustin
> --
> what we call human nature is actually human habit
> --
> http://gleamynode.net/
> --
> PGP Key ID: 0x0255ECA6
>

Re: Messages read while iosession is suspended.

Posted by Trustin Lee <tr...@gmail.com>.
Hi Luis,

On 7/22/07, Luis Neves <lu...@co.sapo.pt> wrote:
>
> Hello all.
> I think I'm misunderstanding something fundamental about how the
> IoSession.suspendRead() and IoSession.resumeRead() methods work and I would
> appreciate any insight you guys can offer.
> I have a producer and consumer both based on Mina. What I want to achieve is to
> block the producer while the the consumer is still processing the message.
>
> The way I thought I could do this was do something like this in the producer side:
>
> [...]
> WriteFuture wf = ioSession.write(message);
> wf.awaitUninterruptibly();
> [...]
>
> and in the consumer side:
>
> public void messageReceived(IoSession iosession, Object message) throws Exception
> {
> long readMsg = iosession.getReadMessages();
> iosession.suspendRead();
> [... slow message processing ...]
> iosession.resumeRead();
> }
>
> I was under the impression that the iosession.suspendRead()  would make the
> producer block, but this is not the behaviour I'm seeing.
> What I'm seeing is that the even with the IoSession in suspend state the
> producer manages to successfully write the messages and the
> iosession.getReadMessages() in the consumer side keeps getting bigger.
> Are my expectations wrong? Is there any way I can achieve the desired behaviour
> with Mina?

suspend/resumeRead() is an asynchronous operation.  It means message
can be received *after* suspendRead() is called.  However, once
suspension request is processed by SocketIoProcessor, no more message
will be received.

Do we need TrafficFuture or something similar for more fine-grained control?

Trustin
-- 
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP Key ID: 0x0255ECA6