You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/18 18:11:41 UTC

git commit: Fixes AMQ-4896 - MQTT does not properly restore durable subs with the Paho client.

Updated Branches:
  refs/heads/trunk de5838660 -> bc4f4e92a


Fixes AMQ-4896 - MQTT does not properly restore durable subs with the Paho client.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bc4f4e92
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bc4f4e92
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bc4f4e92

Branch: refs/heads/trunk
Commit: bc4f4e92a6e38032c91748764e5ef4f03b5d4140
Parents: de58386
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Mon Nov 18 12:11:34 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 18 12:11:34 2013 -0500

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |  3 +
 .../store/PersistenceAdapterSupport.java        | 47 ++++++++++
 .../activemq/transport/ws/MQTTSocket.java       | 25 ++++-
 .../transport/mqtt/MQTTNIOTransportFactory.java |  6 +-
 .../transport/mqtt/MQTTProtocolConverter.java   | 98 ++++++++++++--------
 .../transport/mqtt/MQTTSslTransportFactory.java |  6 +-
 .../transport/mqtt/MQTTTransportFactory.java    |  6 +-
 .../transport/mqtt/MQTTTransportFilter.java     |  5 +-
 8 files changed, 139 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 1ce75a5..3a9a405 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -137,6 +137,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
         this.stopTaskRunnerFactory = stopTaskRunnerFactory;
         this.transport = transport;
         final BrokerService brokerService = this.broker.getBrokerService();
+        if( this.transport instanceof BrokerServiceAware ) {
+            ((BrokerServiceAware)this.transport).setBrokerService(brokerService);
+        }
         this.transport.setTransportListener(new DefaultTransportListener() {
             @Override
             public void onCommand(Object o) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java
new file mode 100644
index 0000000..aca4574
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapterSupport.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Used to implement common PersistenceAdapter methods.
+ */
+public class PersistenceAdapterSupport {
+
+    static public List<SubscriptionInfo> listSubscriptions(PersistenceAdapter pa, String clientId) throws IOException {
+        ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
+        for (ActiveMQDestination destination : pa.getDestinations()) {
+            if( destination.isTopic() ) {
+                TopicMessageStore store = pa.createTopicMessageStore((ActiveMQTopic) destination);
+                for (SubscriptionInfo sub : store.getAllSubscriptions()) {
+                    if(clientId==sub.getClientId() || clientId.equals(sub.getClientId()) ) {
+                        rc.add(sub);
+                    }
+                }
+            }
+        }
+        return rc;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java
index 2e112ab..047c459 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/MQTTSocket.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.transport.ws;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.TransportSupport;
 import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
@@ -35,13 +37,14 @@ import java.io.IOException;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.CountDownLatch;
 
-public class MQTTSocket  extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport {
+public class MQTTSocket  extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport, BrokerServiceAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
     Connection outbound;
-    MQTTProtocolConverter protocolConverter = new MQTTProtocolConverter(this, null);
+    MQTTProtocolConverter protocolConverter = null;
     MQTTWireFormat wireFormat = new MQTTWireFormat();
     private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+    private BrokerService brokerService;
 
     @Override
     public void onMessage(byte[] bytes, int offset, int length) {
@@ -56,12 +59,19 @@ public class MQTTSocket  extends TransportSupport implements WebSocket.OnBinaryM
 
         try {
             MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
-            protocolConverter.onMQTTCommand(frame);
+            getProtocolConverter().onMQTTCommand(frame);
         } catch (Exception e) {
             onException(IOExceptionSupport.create(e));
         }
     }
 
+    private MQTTProtocolConverter getProtocolConverter() {
+        if( protocolConverter == null ) {
+            protocolConverter = new MQTTProtocolConverter(this, brokerService);
+        }
+        return protocolConverter;
+    }
+
     @Override
     public void onOpen(Connection connection) {
         this.outbound = connection;
@@ -70,7 +80,7 @@ public class MQTTSocket  extends TransportSupport implements WebSocket.OnBinaryM
     @Override
     public void onClose(int closeCode, String message) {
         try {
-            protocolConverter.onMQTTCommand(new DISCONNECT().encode());
+            getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
         } catch (Exception e) {
             LOG.warn("Failed to close WebSocket", e);
         }
@@ -101,7 +111,7 @@ public class MQTTSocket  extends TransportSupport implements WebSocket.OnBinaryM
     @Override
     public void oneway(Object command) throws IOException {
         try {
-            protocolConverter.onActiveMQCommand((Command)command);
+            getProtocolConverter().onActiveMQCommand((Command) command);
         } catch (Exception e) {
             onException(IOExceptionSupport.create(e));
         }
@@ -132,4 +142,9 @@ public class MQTTSocket  extends TransportSupport implements WebSocket.OnBinaryM
     public MQTTWireFormat getWireFormat() {
         return wireFormat;
     }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
index 52fa228..96f7747 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
@@ -42,7 +42,7 @@ import org.apache.activemq.wireformat.WireFormat;
  */
 public class MQTTNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
 
-    private BrokerContext brokerContext = null;
+    private BrokerService brokerService = null;
 
     protected String getDefaultWireFormatType() {
         return "mqtt";
@@ -77,13 +77,13 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
 
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new MQTTTransportFilter(transport, format, brokerContext);
+        transport = new MQTTTransportFilter(transport, format, brokerService);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
 
     public void setBrokerService(BrokerService brokerService) {
-        this.brokerContext = brokerService.getBrokerContext();
+        this.brokerService = brokerService;
     }
 
     protected Transport createInactivityMonitor(Transport transport, WireFormat format) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index ac598e7..d4c05eb 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.transport.mqtt;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,29 +30,10 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.activemq.broker.BrokerContext;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.*;
+import org.apache.activemq.store.PersistenceAdapterSupport;
+import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -102,6 +85,7 @@ public class MQTTProtocolConverter {
     private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
     private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
     private final MQTTTransport mqttTransport;
+    private final BrokerService brokerService;
 
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
@@ -113,8 +97,9 @@ public class MQTTProtocolConverter {
     private int activeMQSubscriptionPrefetch=1;
     private final String QOS_PROPERTY_NAME = "QoSPropertyName";
 
-    public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerContext brokerContext) {
+    public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
         this.mqttTransport = mqttTransport;
+        this.brokerService = brokerService;
         this.defaultKeepAlive = 0;
     }
 
@@ -269,12 +254,43 @@ public class MQTTProtocolConverter {
                         connected.set(true);
                         getMQTTTransport().sendToMQTT(ack.encode());
 
+                        List<SubscriptionInfo> subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(), connectionInfo.getClientId());
+                        if( connect.cleanSession() ) {
+                            deleteDurableSubs(subs);
+                        } else {
+                            restoreDurableSubs(subs);
+                        }
                     }
                 });
             }
         });
     }
 
+    public void deleteDurableSubs(List<SubscriptionInfo> subs) {
+        try {
+            for (SubscriptionInfo sub : subs) {
+                TopicMessageStore store = brokerService.getPersistenceAdapter().createTopicMessageStore((ActiveMQTopic) sub.getDestination());
+                store.deleteSubscription(connectionInfo.getClientId(), sub.getSubscriptionName());
+            }
+        } catch (IOException e) {
+            LOG.warn("Could not delete the MQTT durable subs.", e);
+        }
+    }
+
+    public void restoreDurableSubs(List<SubscriptionInfo> subs) {
+        try {
+            SUBSCRIBE command = new SUBSCRIBE();
+            for (SubscriptionInfo sub : subs) {
+                String name = sub.getSubcriptionName();
+                String[] split = name.split(":", 2);
+                QoS qoS = QoS.valueOf(split[0]);
+                onSubscribe(new Topic(split[1], qoS));
+            }
+        } catch (IOException e) {
+            LOG.warn("Could not restore the MQTT durable subs.", e);
+        }
+    }
+
     void onMQTTDisconnect() throws MQTTProtocolException {
         if (connected.get()) {
             connected.set(false);
@@ -290,7 +306,7 @@ public class MQTTProtocolConverter {
         if (topics != null) {
             byte[] qos = new byte[topics.length];
             for (int i = 0; i < topics.length; i++) {
-                qos[i] = (byte) onSubscribe(command, topics[i]).ordinal();
+                qos[i] = (byte) onSubscribe(topics[i]).ordinal();
             }
             SUBACK ack = new SUBACK();
             ack.messageId(command.messageId());
@@ -305,25 +321,25 @@ public class MQTTProtocolConverter {
         }
     }
 
-    QoS onSubscribe(SUBSCRIBE command, Topic topic) throws MQTTProtocolException {
-        ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
+    QoS onSubscribe(Topic topic) throws MQTTProtocolException {
+        if( !mqttSubscriptionByTopic.containsKey(topic.name()) ) {
+            ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topic.name().toString()));
 
-        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
-        ConsumerInfo consumerInfo = new ConsumerInfo(id);
-        consumerInfo.setDestination(destination);
-        consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
-        consumerInfo.setDispatchAsync(true);
-        if (!connect.cleanSession() && (connect.clientId() != null)) {
-            //by default subscribers are persistent
-            consumerInfo.setSubscriptionName(
-                connect.clientId().toString() + topic.name().toString());
-        }
-        MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
+            ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+            ConsumerInfo consumerInfo = new ConsumerInfo(id);
+            consumerInfo.setDestination(destination);
+            consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
+            consumerInfo.setDispatchAsync(true);
+            if (!connect.cleanSession() && (connect.clientId() != null)) {
+                consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
+            }
+            MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
 
-        subscriptionsByConsumerId.put(id, mqttSubscription);
-        mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
+            subscriptionsByConsumerId.put(id, mqttSubscription);
+            mqttSubscriptionByTopic.put(topic.name(), mqttSubscription);
 
-        sendToActiveMQ(consumerInfo, null);
+            sendToActiveMQ(consumerInfo, null);
+        }
         return topic.qos();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java
index 1bb12f2..a0d32b1 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTSslTransportFactory.java
@@ -33,7 +33,7 @@ import org.apache.activemq.wireformat.WireFormat;
  */
 public class MQTTSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
 
-    private BrokerContext brokerContext = null;
+    private BrokerService brokerService = null;
 
     protected String getDefaultWireFormatType() {
         return "mqtt";
@@ -42,7 +42,7 @@ public class MQTTSslTransportFactory extends SslTransportFactory implements Brok
     @SuppressWarnings("rawtypes")
 
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new MQTTTransportFilter(transport, format, brokerContext);
+        transport = new MQTTTransportFilter(transport, format, brokerService);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
@@ -61,7 +61,7 @@ public class MQTTSslTransportFactory extends SslTransportFactory implements Brok
     }
 
     public void setBrokerService(BrokerService brokerService) {
-        this.brokerContext = brokerService.getBrokerContext();
+        this.brokerService = brokerService;
     }
 
     protected Transport createInactivityMonitor(Transport transport, WireFormat format) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
index 7b4696a..da23ba5 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFactory.java
@@ -40,7 +40,7 @@ import javax.net.ServerSocketFactory;
  */
 public class MQTTTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
 
-    private BrokerContext brokerContext = null;
+    private BrokerService brokerService = null;
 
     protected String getDefaultWireFormatType() {
         return "mqtt";
@@ -54,13 +54,13 @@ public class MQTTTransportFactory extends TcpTransportFactory implements BrokerS
 
     @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new MQTTTransportFilter(transport, format, brokerContext);
+        transport = new MQTTTransportFilter(transport, format, brokerService);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
 
     public void setBrokerService(BrokerService brokerService) {
-        this.brokerContext = brokerService.getBrokerContext();
+        this.brokerService = brokerService;
     }
 
     @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/activemq/blob/bc4f4e92/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
index 805fd3f..1dcf3dc 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
@@ -50,9 +51,9 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
 
     private boolean trace;
 
-    public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
+    public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
         super(next);
-        this.protocolConverter = new MQTTProtocolConverter(this, brokerContext);
+        this.protocolConverter = new MQTTProtocolConverter(this, brokerService);
 
         if (wireFormat instanceof MQTTWireFormat) {
             this.wireFormat = (MQTTWireFormat) wireFormat;