You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Jiri Daněk <jd...@redhat.com> on 2022/10/10 15:15:31 UTC

[protonj2] How do I force the library to send Detach frame on the wire?

I want to remove a durable subscriber that I have created on the remote
peer, which is the Artemis messaging broker.

How do I do this correctly? (Also, as a broader question, where do I find
complete instructions for working with durable subscriptions on Artemis
broker using an AMQP client, as opposed to JMS 2.0 client?)

Currently, I am using this code
https://github.com/rh-messaging/cli-java/blob/6c2cfc02984cfd93b44d306973f83422f716aeba/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java#L240-L243

that is, I create a receiver and then call receiver.close(). (If I wanted
to keep the durable subscriber active, I'd do receiver.detach() instead).

This is what happens when I export PN_TRACE_FRM=true

-> SASL:[1658511941:0] AMQP,3,1,0,0
<- SASL:[1658511941:0] AMQP,3,1,0,0
<- SASL:[1658511941:0] SaslMechanisms{saslServerMechanisms=[PLAIN,
ANONYMOUS]}
-> SASL:[1658511941:0] SaslInit{mechanism=PLAIN,
initialResponse="\x00admin\x00admin", hostname='null'}
<- SASL:[1658511941:0] SaslOutcome{code=OK, additionalData=null}
-> AMQP:[1658511941:0] AMQP,0,1,0,0
<- AMQP:[1658511941:0] AMQP,0,1,0,0
-> AMQP:[1658511941:0] Open{ containerId='cliId0', hostname='127.0.0.1',
maxFrameSize=65536, channelMax=65535, idleTimeOut=60000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], properties=null}
-> AMQP:[1658511941:0] Close{error=null}
<- AMQP:[1658511941:0] Open{ containerId='amq', hostname='null',
maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null,
offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
properties={product=apache-activemq-artemis, version=2.26.0}}
<- AMQP:[1658511941:0] Close{error=null}

Client does not even bother doing Begin/Attach/Detach! So the durable
subscriber stays active.

What I want to happen is something similar to what I can get with Qpid JMS,
which is

[1253727474:0] -> SASL
[1253727474:0] <- SASL
[1253727474:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
[1253727474:0] -> SaslInit{mechanism=PLAIN,
initialResponse=\x00admin\x00admin, hostname='127.0.0.1'}
[1253727474:0] <- SaslOutcome{_code=OK, _additionalData=null}
[1253727474:0] <- AMQP
[1253727474:0] -> AMQP
[1253727474:0] -> Open{ containerId='cliId0', hostname='127.0.0.1',
maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=2.1.0,
platform=JVM: 11.0.17-ea, 11.0.17-ea+1, Red Hat, Inc., OS: Linux,
5.19.11-300.fc37.x86_64, amd64}}
[1253727474:0] <- Open{ containerId='amq', hostname='null',
maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null,
offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
properties={product=apache-activemq-artemis, version=2.26.0}}
[1253727474:0] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] <- Begin{remoteChannel=0, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:1] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:1] <- Begin{remoteChannel=1, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] -> Attach{name='ds0', handle=0, role=RECEIVER,
sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null,
target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null,
maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
properties=null}
[1253727474:0] <- Attach{name='ds0', handle=0, role=SENDER,
sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='cliId0.ds0', durable=UNSETTLED_STATE,
expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=copy, filter=null, defaultOutcome=null, outcomes=null,
capabilities=[topic]}, target=Target{address='null', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, capabilities=null}, unsettled=null,
incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] -> Detach{handle=0, closed=true, error=null}
[1253727474:0] <- Detach{handle=0, closed=true, error=null}
[1253727474:1] -> End{error=null}
[1253727474:1] <- End{error=null}
[1253727474:0] -> Close{error=null}
[1253727474:0] <- Close{error=null}

As a workaround, I figured I can do

            if (stringToBool(subscriberUnsubscribeString)) {
                receiver.receive(1, TimeUnit.SECOND);  // add this receive
call
                receiver.close();
                return 0;
            }

but that does something different than the Qpid JMS example (it sends flow
and actually would receive messages, if there was any).

When I tried tryReceive instead of receive, the AMQP trace stayed the same
as it was originally, that is, no Begin/Attach/Detach was produced either.
-- 
Mit freundlichen Grüßen / Kind regards
Jiri Daněk

Re: [protonj2] How do I force the library to send Detach frame on the wire?

Posted by Timothy Bish <ta...@gmail.com>.
On 10/10/22 11:15, Jiri Daněk wrote:
> I want to remove a durable subscriber that I have created on the remote
> peer, which is the Artemis messaging broker.
>
> How do I do this correctly? (Also, as a broader question, where do I find
> complete instructions for working with durable subscriptions on Artemis
> broker using an AMQP client, as opposed to JMS 2.0 client?)
>
> Currently, I am using this code
> https://github.com/rh-messaging/cli-java/blob/6c2cfc02984cfd93b44d306973f83422f716aeba/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java#L240-L243
>
> that is, I create a receiver and then call receiver.close(). (If I wanted
> to keep the durable subscriber active, I'd do receiver.detach() instead).
>
> This is what happens when I export PN_TRACE_FRM=true
>
> -> SASL:[1658511941:0] AMQP,3,1,0,0
> <- SASL:[1658511941:0] AMQP,3,1,0,0
> <- SASL:[1658511941:0] SaslMechanisms{saslServerMechanisms=[PLAIN,
> ANONYMOUS]}
> -> SASL:[1658511941:0] SaslInit{mechanism=PLAIN,
> initialResponse="\x00admin\x00admin", hostname='null'}
> <- SASL:[1658511941:0] SaslOutcome{code=OK, additionalData=null}
> -> AMQP:[1658511941:0] AMQP,0,1,0,0
> <- AMQP:[1658511941:0] AMQP,0,1,0,0
> -> AMQP:[1658511941:0] Open{ containerId='cliId0', hostname='127.0.0.1',
> maxFrameSize=65536, channelMax=65535, idleTimeOut=60000,
> outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
> desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
> SHARED-SUBS, ANONYMOUS-RELAY], properties=null}
> -> AMQP:[1658511941:0] Close{error=null}
> <- AMQP:[1658511941:0] Open{ containerId='amq', hostname='null',
> maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
> outgoingLocales=null, incomingLocales=null,
> offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
> SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
> properties={product=apache-activemq-artemis, version=2.26.0}}
> <- AMQP:[1658511941:0] Close{error=null}
>
> Client does not even bother doing Begin/Attach/Detach! So the durable
> subscriber stays active.

You need to use the openFuture of the Receiver to wait until the remote 
has fully opened it and then close the receiver to ensure that you get 
the behavior you are looking for.  In this case it is quite likely that 
the connection was still not fully setup when you opened and closed the 
receiver resulting in the engine deciding there was no need to send the 
performatives since you opened and closed before the connection likely 
finished the SASL exchange.  For an operation like this it makes sense 
to always ensure the receiver fully opened as the open can fail but 
closing it wouldn't tell you that.


>
> What I want to happen is something similar to what I can get with Qpid JMS,
> which is
>
> [1253727474:0] -> SASL
> [1253727474:0] <- SASL
> [1253727474:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
> [1253727474:0] -> SaslInit{mechanism=PLAIN,
> initialResponse=\x00admin\x00admin, hostname='127.0.0.1'}
> [1253727474:0] <- SaslOutcome{_code=OK, _additionalData=null}
> [1253727474:0] <- AMQP
> [1253727474:0] -> AMQP
> [1253727474:0] -> Open{ containerId='cliId0', hostname='127.0.0.1',
> maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000,
> outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
> desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
> ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=2.1.0,
> platform=JVM: 11.0.17-ea, 11.0.17-ea+1, Red Hat, Inc., OS: Linux,
> 5.19.11-300.fc37.x86_64, amd64}}
> [1253727474:0] <- Open{ containerId='amq', hostname='null',
> maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
> outgoingLocales=null, incomingLocales=null,
> offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
> SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
> properties={product=apache-activemq-artemis, version=2.26.0}}
> [1253727474:0] -> Begin{remoteChannel=null, nextOutgoingId=1,
> incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:0] <- Begin{remoteChannel=0, nextOutgoingId=1,
> incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:1] -> Begin{remoteChannel=null, nextOutgoingId=1,
> incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:1] <- Begin{remoteChannel=1, nextOutgoingId=1,
> incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:0] -> Attach{name='ds0', handle=0, role=RECEIVER,
> sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null,
> target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null,
> maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
> properties=null}
> [1253727474:0] <- Attach{name='ds0', handle=0, role=SENDER,
> sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
> source=Source{address='cliId0.ds0', durable=UNSETTLED_STATE,
> expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null,
> distributionMode=copy, filter=null, defaultOutcome=null, outcomes=null,
> capabilities=[topic]}, target=Target{address='null', durable=NONE,
> expiryPolicy=SESSION_END, timeout=0, dynamic=false,
> dynamicNodeProperties=null, capabilities=null}, unsettled=null,
> incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:0] -> Detach{handle=0, closed=true, error=null}
> [1253727474:0] <- Detach{handle=0, closed=true, error=null}
> [1253727474:1] -> End{error=null}
> [1253727474:1] <- End{error=null}
> [1253727474:0] -> Close{error=null}
> [1253727474:0] <- Close{error=null}
>
> As a workaround, I figured I can do
>
>              if (stringToBool(subscriberUnsubscribeString)) {
>                  receiver.receive(1, TimeUnit.SECOND);  // add this receive
> call
>                  receiver.close();
>                  return 0;
>              }
>
> but that does something different than the Qpid JMS example (it sends flow
> and actually would receive messages, if there was any).
>
> When I tried tryReceive instead of receive, the AMQP trace stayed the same
> as it was originally, that is, no Begin/Attach/Detach was produced either.


-- 
Tim Bish


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org