You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Christian Posta <ch...@gmail.com> on 2012/10/22 22:01:10 UTC

Re: ActiveMQ (5.7 snapshot) and MQTT: connection times out while still connected to a topic for receiving

Nope, you're not missing anything. There is a bug in there. Taking a look
right now...



On Tue, Sep 25, 2012 at 2:49 PM, Aliquip <al...@gmail.com> wrote:

> ActiveMQ (5.7 snapshot) and MQTT: connection times out while still
> connected
> to a topic for receiving
>
> I'm using mosquitto's python client to connect to an mqtt topic. When using
> the mosquitto broker,
> all works (but that broker doesn't allow me to plug in authentication to my
> database, so it's no real option).
>
> When using activemq, the connection is disconnected. The clients does ping,
> and the broker does respond. But it doesn't have seem an effect as the
> connection still times out.
>
> Am i missing some configuration option?
>
>
> broker config (scala):
> ===================================================================
> val broker = new BrokerService()
> broker.setPlugins(Array(new UpoAuthenticationPlugin(admin_user, admin_pw)))
>
> broker.addConnector("mqtt+nio://127.0.0.1:1883")
> broker.addConnector("vm://localhost")
> broker.start()
> ===================================================================
>
> python client:
> ===================================================================
> import mosquitto
> import os
> import time
>
> client = mosquitto.Mosquitto("test-client")
> client.username_pw_set("user", "password")
>
> def on_connect(mosq, obj, rc):
>     if rc == 0:
>         print("Connected successfully.")
>
> client.on_connect = on_connect
>
> def on_disconnect(mosq, obj, rc):
>     print("Disconnected successfully.")
>
> client.on_disconnect = on_disconnect
>
> def on_publish(mosq, obj, mid):
>     print("Message "+str(mid)+" published.")
>
> client.on_publish = on_publish
>
> def on_message(mosq, obj, msg):
>     print("Message received on topic "+msg.topic+" with QoS
> "+str(msg.qos)+"
> and payload "+msg.payload)
>
> client.on_message = on_message
>
> def on_subscribe(mosq, obj, mid, qos_list):
>     print("Subscribe with mid "+str(mid)+" received.")
>
> client.on_subscribe = on_subscribe
>
> client.connect("127.0.0.1", keepalive=5)
>
> client.subscribe("groups/Nameless", 1)
> client.publish("groups/Nameless", "hello world", 1)
>
> while True:
>     print "loop" , client.loop()
>     time.sleep(2)
>     pass
> ===================================================================
>
> ActiveMQ log
> ===================================================================
> [debug] o.a.a.t.m.MQTTProtocolConverter - MQTT Client  connected.
> [debug] o.a.a.b.r.AbstractRegion - localhost adding consumer:
> ID:Jeroen-Laptop-60782-1348606334006-9:4:-1:1 for destination:
> topic://groups.Nameless
> [debug] o.a.a.b.r.AbstractRegion - localhost adding destination:
> topic://ActiveMQ.Advisory.Consumer.Topic.groups.Nameless
> [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - Transport
> Connection to: tcp://127.0.0.1:61332
> [trace] o.a.a.t.PooledTaskRunner - Running task iteration 1 - Transport
> Connection to: tcp://127.0.0.1:61332
> [trace] o.a.a.t.PooledTaskRunner - Run task done: Transport Connection to:
> tcp://127.0.0.1:61332
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.b.r.Queue - queue://test expiring messages ..
> [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
> pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors
> QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [debug] o.a.a.b.r.Queue - queue://test expiring messages done.
> [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [debug] o.a.a.b.r.Queue - queue://test expiring messages ..
> [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
> pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [debug] o.a.a.b.r.Queue - queue://test expiring messages done.
> [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [trace] o.a.a.s.k.MessageDatabase - Last update: 1:3698624, full gc
> candidates set: [1]
> [trace] o.a.a.s.k.MessageDatabase - gc candidates after first tx:1:3698624,
> []
> [trace] o.a.a.s.k.MessageDatabase - gc candidates: []
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [trace] o.a.a.t.m.MQTTInactivityMonitor - Message received since last read
> check, resetting flag:
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.b.r.Queue - queue://test expiring messages ..
> [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
> pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [debug] o.a.a.b.r.Queue - queue://test expiring messages done.
> [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [debug] o.a.a.b.r.Queue - queue://test expiring messages ..
> [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
> pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [debug] o.a.a.b.r.Queue - queue://test expiring messages done.
> [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [trace] o.a.a.s.k.MessageDatabase - Last update: 1:3701042, full gc
> candidates set: [1]
> [trace] o.a.a.s.k.MessageDatabase - gc candidates after first tx:1:3701042,
> []
> [trace] o.a.a.s.k.MessageDatabase - gc candidates: []
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTProtocolConverter - Sent Ping Response to
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.t.m.MQTTInactivityMonitor - 30001 ms elapsed since last read
> check.
> [debug] o.a.a.t.m.MQTTInactivityMonitor - No message received since last
> read check for tcp:///127.0.0.1:61332@1883! Throwing
> InactivityIOException.
> [trace] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@6e3271eb with await termination: 0
> millis
> [debug] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@6e3271eb is shutdown: true and
> terminated: false took: 0.014 seconds.
> [debug] o.a.a.b.T.Transport - Transport Connection to: tcp://
> 127.0.0.1:61332
> failed: org.apache.activemq.transport.InactivityIOException: Channel was
> inactive for too (>5000) long:
> tcp://127.0.0.1:61332org.apache.activemq.transport.InactivityIOException:
> Channel was inactive for too (>5000) long: tcp://127.0.0.1:61332
>         at
>
> org.apache.activemq.transport.mqtt.MQTTInactivityMonitor$2.run(MQTTInactivityMonitor.java:133)
> [activemq-core-5.7-SNAPSHOT.jar:5.7-SNAPSHOT]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
> Source) [na:1.6.0_24]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source) [na:1.6.0_24]
>         at java.lang.Thread.run(Unknown Source) [na:1.6.0_24]
> [debug] o.a.a.b.j.ManagementContext - Unregistering MBean
>
> org.apache.activemq:BrokerName=localhost,Type=Connection,ConnectorName=mqtt+nio_//localhost_1883,Connection=test-client
> [debug] o.a.a.b.j.ManagementContext - Unregistering MBean
>
> org.apache.activemq:BrokerName=localhost,Type=Connection,ConnectorName=mqtt+nio_//localhost_1883,ViewType=address,Name=tcp_//127.0.0.1_61332
> [trace] o.a.a.t.TaskRunnerFactory - Execute[ActiveMQ
> BrokerService[localhost] Task] runnable:
> org.apache.activemq.broker.TransportConnection$4@79bd7026
> [debug] o.a.a.b.TransportConnection - Stopping connection:
> tcp://127.0.0.1:61332
> [debug] o.a.a.t.t.TcpTransport - Stopping transport
> tcp:///127.0.0.1:61332@1883
> [debug] o.a.a.t.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ
> Task] using ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@40e0d3b
> [trace] o.a.a.t.TaskRunnerFactory - Execute[ActiveMQ Task] runnable:
> org.apache.activemq.transport.tcp.TcpTransport$1@36869e91
> [trace] o.a.a.t.TaskRunnerFactory - Created thread[ActiveMQ Task-1]:
> Thread[ActiveMQ Task-1,5,main]
> [trace] o.a.a.t.t.TcpTransport - Closing socket
> Socket[addr=/127.0.0.1,port=61332,localport=1883]
> [debug] o.a.a.t.t.TcpTransport - Closed socket Socket[unconnected]
> [debug] o.a.a.u.ThreadPoolUtils - Forcing shutdown of ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@40e0d3b
> [trace] o.a.a.u.ThreadPoolUtils - Shutdown of ExecutorService:
> java.util.concurrent.ThreadPoolExecutor@40e0d3b is shutdown: true and
> terminated: false.
> [debug] o.a.a.b.TransportConnection - Stopped transport: null
> [trace] o.a.a.t.PooledTaskRunner - Shutdown timeout: Transport Connection
> to: null task: {}
> [debug] o.a.a.b.TransportConnection - Cleaning up connection resources:
> null
> [debug] o.a.a.b.TransportConnection - remove connection id:
> ID:Jeroen-Laptop-60782-1348606334006-9:4
> [debug] o.a.a.b.j.ManagementContext - Unregistering MBean
>
> org.apache.activemq:BrokerName=localhost,Type=Subscription,persistentMode=Non-Durable,destinationType=Topic,destinationName=groups.Nameless,clientId=test-client,consumerId=ID_Jeroen-Laptop-60782-1348606334006-9_4_-1_1
> [debug] o.a.a.b.r.AbstractRegion - localhost removing consumer:
> ID:Jeroen-Laptop-60782-1348606334006-9:4:-1:1 for destination:
> topic://groups.Nameless
> [debug] o.a.a.b.j.ManagementContext - Unregistering MBean
>
> org.apache.activemq:BrokerName=localhost,Type=Producer,destinationType=Dynamic,clientId=test-client,producerId=ID_Jeroen-Laptop-60782-1348606334006-9_4_-1_1
> [debug] o.a.a.b.TransportConnection - Connection Stopped: null
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint started.
> [debug] o.a.a.s.k.MessageDatabase - Checkpoint done.
> [debug] o.a.a.b.r.Queue - queue://test expiring messages ..
> [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
> pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [debug] o.a.a.b.r.Queue - queue://test expiring messages done.
> [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [debug] o.a.a.b.r.Queue - queue://test expiring messages ..
> [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [debug] o.a.a.b.r.Queue - test toPageIn: 0, Inflight: 0,
> pagedInMessages.size 0, enqueueCount: 0, dequeueCount: 0
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [trace] o.a.a.b.r.c.AbstractStoreCursor -
> org.apache.activemq.broker.region.cursors.QueueStorePrefetch@a7fd9bb
> :test,batchResetNeeded=false,storeHasMessages=false,size=0,cacheEnabled=true,maxBatchSize:200
> - fillBatch
> [debug] o.a.a.b.r.Queue - queue://test expiring messages done.
> [trace] o.a.a.t.PooledTaskRunner - Running task iteration 0 - queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> [trace] o.a.a.t.PooledTaskRunner - Run task done: queue://test,
> subscriptions=0, memory=0%, size=0, in flight groups=null
> ===================================================================
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-5-7-snapshot-and-MQTT-connection-times-out-while-still-connected-to-a-topic-for-receiving-tp4656976.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog