You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Jiri Daněk <jd...@redhat.com> on 2022/10/10 15:15:31 UTC
[protonj2] How do I force the library to send Detach frame on the wire?
I want to remove a durable subscriber that I have created on the remote
peer, which is the Artemis messaging broker.
How do I do this correctly? (Also, as a broader question, where do I find
complete instructions for working with durable subscriptions on Artemis
broker using an AMQP client, as opposed to JMS 2.0 client?)
Currently, I am using this code
https://github.com/rh-messaging/cli-java/blob/6c2cfc02984cfd93b44d306973f83422f716aeba/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java#L240-L243
that is, I create a receiver and then call receiver.close(). (If I wanted
to keep the durable subscriber active, I'd do receiver.detach() instead).
This is what happens when I export PN_TRACE_FRM=true
-> SASL:[1658511941:0] AMQP,3,1,0,0
<- SASL:[1658511941:0] AMQP,3,1,0,0
<- SASL:[1658511941:0] SaslMechanisms{saslServerMechanisms=[PLAIN,
ANONYMOUS]}
-> SASL:[1658511941:0] SaslInit{mechanism=PLAIN,
initialResponse="\x00admin\x00admin", hostname='null'}
<- SASL:[1658511941:0] SaslOutcome{code=OK, additionalData=null}
-> AMQP:[1658511941:0] AMQP,0,1,0,0
<- AMQP:[1658511941:0] AMQP,0,1,0,0
-> AMQP:[1658511941:0] Open{ containerId='cliId0', hostname='127.0.0.1',
maxFrameSize=65536, channelMax=65535, idleTimeOut=60000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], properties=null}
-> AMQP:[1658511941:0] Close{error=null}
<- AMQP:[1658511941:0] Open{ containerId='amq', hostname='null',
maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null,
offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
properties={product=apache-activemq-artemis, version=2.26.0}}
<- AMQP:[1658511941:0] Close{error=null}
Client does not even bother doing Begin/Attach/Detach! So the durable
subscriber stays active.
What I want to happen is something similar to what I can get with Qpid JMS,
which is
[1253727474:0] -> SASL
[1253727474:0] <- SASL
[1253727474:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
[1253727474:0] -> SaslInit{mechanism=PLAIN,
initialResponse=\x00admin\x00admin, hostname='127.0.0.1'}
[1253727474:0] <- SaslOutcome{_code=OK, _additionalData=null}
[1253727474:0] <- AMQP
[1253727474:0] -> AMQP
[1253727474:0] -> Open{ containerId='cliId0', hostname='127.0.0.1',
maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=2.1.0,
platform=JVM: 11.0.17-ea, 11.0.17-ea+1, Red Hat, Inc., OS: Linux,
5.19.11-300.fc37.x86_64, amd64}}
[1253727474:0] <- Open{ containerId='amq', hostname='null',
maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
outgoingLocales=null, incomingLocales=null,
offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
properties={product=apache-activemq-artemis, version=2.26.0}}
[1253727474:0] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] <- Begin{remoteChannel=0, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:1] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:1] <- Begin{remoteChannel=1, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] -> Attach{name='ds0', handle=0, role=RECEIVER,
sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null,
target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null,
maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
properties=null}
[1253727474:0] <- Attach{name='ds0', handle=0, role=SENDER,
sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='cliId0.ds0', durable=UNSETTLED_STATE,
expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=copy, filter=null, defaultOutcome=null, outcomes=null,
capabilities=[topic]}, target=Target{address='null', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, capabilities=null}, unsettled=null,
incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1253727474:0] -> Detach{handle=0, closed=true, error=null}
[1253727474:0] <- Detach{handle=0, closed=true, error=null}
[1253727474:1] -> End{error=null}
[1253727474:1] <- End{error=null}
[1253727474:0] -> Close{error=null}
[1253727474:0] <- Close{error=null}
As a workaround, I figured I can do
if (stringToBool(subscriberUnsubscribeString)) {
receiver.receive(1, TimeUnit.SECOND); // add this receive
call
receiver.close();
return 0;
}
but that does something different than the Qpid JMS example (it sends flow
and actually would receive messages, if there was any).
When I tried tryReceive instead of receive, the AMQP trace stayed the same
as it was originally, that is, no Begin/Attach/Detach was produced either.
--
Mit freundlichen Grüßen / Kind regards
Jiri Daněk
Re: [protonj2] How do I force the library to send Detach frame on the wire?
Posted by Timothy Bish <ta...@gmail.com>.
On 10/10/22 11:15, Jiri Daněk wrote:
> I want to remove a durable subscriber that I have created on the remote
> peer, which is the Artemis messaging broker.
>
> How do I do this correctly? (Also, as a broader question, where do I find
> complete instructions for working with durable subscriptions on Artemis
> broker using an AMQP client, as opposed to JMS 2.0 client?)
>
> Currently, I am using this code
> https://github.com/rh-messaging/cli-java/blob/6c2cfc02984cfd93b44d306973f83422f716aeba/cli-protonj2/src/main/java/com/redhat/mqe/CliProtonJ2Receiver.java#L240-L243
>
> that is, I create a receiver and then call receiver.close(). (If I wanted
> to keep the durable subscriber active, I'd do receiver.detach() instead).
>
> This is what happens when I export PN_TRACE_FRM=true
>
> -> SASL:[1658511941:0] AMQP,3,1,0,0
> <- SASL:[1658511941:0] AMQP,3,1,0,0
> <- SASL:[1658511941:0] SaslMechanisms{saslServerMechanisms=[PLAIN,
> ANONYMOUS]}
> -> SASL:[1658511941:0] SaslInit{mechanism=PLAIN,
> initialResponse="\x00admin\x00admin", hostname='null'}
> <- SASL:[1658511941:0] SaslOutcome{code=OK, additionalData=null}
> -> AMQP:[1658511941:0] AMQP,0,1,0,0
> <- AMQP:[1658511941:0] AMQP,0,1,0,0
> -> AMQP:[1658511941:0] Open{ containerId='cliId0', hostname='127.0.0.1',
> maxFrameSize=65536, channelMax=65535, idleTimeOut=60000,
> outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
> desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
> SHARED-SUBS, ANONYMOUS-RELAY], properties=null}
> -> AMQP:[1658511941:0] Close{error=null}
> <- AMQP:[1658511941:0] Open{ containerId='amq', hostname='null',
> maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
> outgoingLocales=null, incomingLocales=null,
> offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
> SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
> properties={product=apache-activemq-artemis, version=2.26.0}}
> <- AMQP:[1658511941:0] Close{error=null}
>
> Client does not even bother doing Begin/Attach/Detach! So the durable
> subscriber stays active.
You need to use the openFuture of the Receiver to wait until the remote
has fully opened it and then close the receiver to ensure that you get
the behavior you are looking for. In this case it is quite likely that
the connection was still not fully setup when you opened and closed the
receiver resulting in the engine deciding there was no need to send the
performatives since you opened and closed before the connection likely
finished the SASL exchange. For an operation like this it makes sense
to always ensure the receiver fully opened as the open can fail but
closing it wouldn't tell you that.
>
> What I want to happen is something similar to what I can get with Qpid JMS,
> which is
>
> [1253727474:0] -> SASL
> [1253727474:0] <- SASL
> [1253727474:0] <- SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
> [1253727474:0] -> SaslInit{mechanism=PLAIN,
> initialResponse=\x00admin\x00admin, hostname='127.0.0.1'}
> [1253727474:0] <- SaslOutcome{_code=OK, _additionalData=null}
> [1253727474:0] <- AMQP
> [1253727474:0] -> AMQP
> [1253727474:0] -> Open{ containerId='cliId0', hostname='127.0.0.1',
> maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000,
> outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
> desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
> ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=2.1.0,
> platform=JVM: 11.0.17-ea, 11.0.17-ea+1, Red Hat, Inc., OS: Linux,
> 5.19.11-300.fc37.x86_64, amd64}}
> [1253727474:0] <- Open{ containerId='amq', hostname='null',
> maxFrameSize=131072, channelMax=65535, idleTimeOut=30000,
> outgoingLocales=null, incomingLocales=null,
> offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY,
> SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null,
> properties={product=apache-activemq-artemis, version=2.26.0}}
> [1253727474:0] -> Begin{remoteChannel=null, nextOutgoingId=1,
> incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:0] <- Begin{remoteChannel=0, nextOutgoingId=1,
> incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:1] -> Begin{remoteChannel=null, nextOutgoingId=1,
> incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:1] <- Begin{remoteChannel=1, nextOutgoingId=1,
> incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:0] -> Attach{name='ds0', handle=0, role=RECEIVER,
> sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null,
> target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END,
> timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
> unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null,
> maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
> properties=null}
> [1253727474:0] <- Attach{name='ds0', handle=0, role=SENDER,
> sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
> source=Source{address='cliId0.ds0', durable=UNSETTLED_STATE,
> expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null,
> distributionMode=copy, filter=null, defaultOutcome=null, outcomes=null,
> capabilities=[topic]}, target=Target{address='null', durable=NONE,
> expiryPolicy=SESSION_END, timeout=0, dynamic=false,
> dynamicNodeProperties=null, capabilities=null}, unsettled=null,
> incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null,
> offeredCapabilities=null, desiredCapabilities=null, properties=null}
> [1253727474:0] -> Detach{handle=0, closed=true, error=null}
> [1253727474:0] <- Detach{handle=0, closed=true, error=null}
> [1253727474:1] -> End{error=null}
> [1253727474:1] <- End{error=null}
> [1253727474:0] -> Close{error=null}
> [1253727474:0] <- Close{error=null}
>
> As a workaround, I figured I can do
>
> if (stringToBool(subscriberUnsubscribeString)) {
> receiver.receive(1, TimeUnit.SECOND); // add this receive
> call
> receiver.close();
> return 0;
> }
>
> but that does something different than the Qpid JMS example (it sends flow
> and actually would receive messages, if there was any).
>
> When I tried tryReceive instead of receive, the AMQP trace stayed the same
> as it was originally, that is, no Begin/Attach/Detach was produced either.
--
Tim Bish
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org