You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2016/10/21 06:07:58 UTC

[1/2] karaf-decanter git commit: [KARAF-4799] Add text message support in JMS collector, and corresponding test

Repository: karaf-decanter
Updated Branches:
  refs/heads/master 93d29f040 -> 7f5cbf44e


[KARAF-4799] Add text message support in JMS collector, and corresponding test


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/f1409497
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/f1409497
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/f1409497

Branch: refs/heads/master
Commit: f140949774db111fbf0022a7a10fe51bb24aee2f
Parents: 93d29f0
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri Oct 21 07:35:50 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Oct 21 08:03:46 2016 +0200

----------------------------------------------------------------------
 collector/jms/pom.xml                           |  29 ++-
 .../decanter/collector/jms/JmsCollector.java    | 113 +++++++---
 .../collector/jms/JmsCollectorTest.java         | 209 +++++++++++++++++++
 3 files changed, 317 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/f1409497/collector/jms/pom.xml
----------------------------------------------------------------------
diff --git a/collector/jms/pom.xml b/collector/jms/pom.xml
index ec45abc..3a15212 100644
--- a/collector/jms/pom.xml
+++ b/collector/jms/pom.xml
@@ -35,12 +35,39 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.karaf.decanter</groupId>
+            <artifactId>org.apache.karaf.decanter.api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.geronimo.specs</groupId>
             <artifactId>geronimo-jms_1.1_spec</artifactId>
             <version>1.1.1</version>
         </dependency>
-    </dependencies>
 
+        <!-- test -->
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-kahadb-store</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.marshaller</groupId>
+            <artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/f1409497/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java
----------------------------------------------------------------------
diff --git a/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java b/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java
index 103908b..ee1e5b8 100644
--- a/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java
+++ b/collector/jms/src/main/java/org/apache/karaf/decanter/collector/jms/JmsCollector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.karaf.decanter.collector.jms;
 
+import org.apache.karaf.decanter.api.marshaller.Unmarshaller;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -28,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
+import java.io.ByteArrayInputStream;
 import java.net.InetAddress;
 import java.util.Dictionary;
 import java.util.Enumeration;
@@ -51,6 +53,7 @@ public class JmsCollector {
     private String destinationType;
 
     private EventAdmin dispatcher;
+    private Unmarshaller unmarshaller;
     private Connection connection;
     private Session session;
 
@@ -67,7 +70,7 @@ public class JmsCollector {
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination destination = createDestination(session);
         MessageConsumer consumer = session.createConsumer(destination);
-        consumer.setMessageListener(new DecanterMessageListener(dispatcher));
+        consumer.setMessageListener(new DecanterMessageListener(dispatcher, unmarshaller));
         connection.start();
     }
 
@@ -123,56 +126,100 @@ public class JmsCollector {
         this.dispatcher = dispatcher;
     }
 
+    @Reference
+    public void setUnmarshaller(Unmarshaller unmarshaller) {
+        this.unmarshaller = unmarshaller;
+    }
+
     public class DecanterMessageListener implements MessageListener {
 
         private EventAdmin dispatcher;
+        private Unmarshaller unmarshaller;
 
-        public DecanterMessageListener(EventAdmin dispatcher) {
+        public DecanterMessageListener(EventAdmin dispatcher, Unmarshaller unmarshaller) {
             this.dispatcher = dispatcher;
+            this.unmarshaller = unmarshaller;
         }
 
         @Override
         public void onMessage(Message message) {
-            if (!(message instanceof MapMessage)) {
-                LOGGER.warn("JMS is not a MapMessage.");
+            if (!(message instanceof MapMessage) && !(message instanceof TextMessage)) {
+                LOGGER.warn("JMS is not a MapMessage or a TextMessage.");
                 return;
             }
 
-            MapMessage mapMessage = (MapMessage) message;
-
-            try {
-                Map<String, Object> data = new HashMap<>();
+            if (message instanceof MapMessage) {
+                MapMessage mapMessage = (MapMessage) message;
 
                 try {
-                    data.put("hostAddress", InetAddress.getLocalHost().getHostAddress());
-                    data.put("hostName", InetAddress.getLocalHost().getHostName());
+                    Map<String, Object> data = new HashMap<>();
+
+                    try {
+                        data.put("hostAddress", InetAddress.getLocalHost().getHostAddress());
+                        data.put("hostName", InetAddress.getLocalHost().getHostName());
+                    } catch (Exception e) {
+                        LOGGER.warn("Can't populate local host name and address", e);
+                    }
+
+                    // custom fields
+                    Enumeration<String> keys = properties.keys();
+                    while (keys.hasMoreElements()) {
+                        String key = keys.nextElement();
+                        data.put(key, properties.get(key));
+                    }
+
+                    Enumeration names = mapMessage.getMapNames();
+                    while (names.hasMoreElements()) {
+                        String name = (String) names.nextElement();
+                        data.put(name, mapMessage.getObject(name));
+                    }
+
+                    data.put("type", "jms");
+                    String karafName = System.getProperty("karaf.name");
+                    if (karafName != null) {
+                        data.put("karafName", karafName);
+                    }
+
+                    Event event = new Event(dispatcherTopic, data);
+                    dispatcher.postEvent(event);
                 } catch (Exception e) {
-                    LOGGER.warn("Can't populate local host name and address", e);
-                }
-
-                // custom fields
-                Enumeration<String> keys = properties.keys();
-                while (keys.hasMoreElements()) {
-                    String key = keys.nextElement();
-                    data.put(key, properties.get(key));
-                }
-
-                Enumeration names = mapMessage.getMapNames();
-                while (names.hasMoreElements()) {
-                    String name = (String) names.nextElement();
-                    data.put(name, mapMessage.getObject(name));
+                    LOGGER.warn("Can't process JMS message", e);
                 }
+            }
+            if (message instanceof TextMessage) {
+                TextMessage textMessage = (TextMessage) message;
 
-                data.put("type", "jms");
-                String karafName = System.getProperty("karaf.name");
-                if (karafName != null) {
-                    data.put("karafName", karafName);
+                try {
+                    Map<String, Object> data = new HashMap<>();
+
+                    try {
+                        data.put("hostAddress", InetAddress.getLocalHost().getHostAddress());
+                        data.put("hostName", InetAddress.getLocalHost().getHostName());
+                    } catch (Exception e) {
+                        LOGGER.warn("Can't populate local host name and address", e);
+                    }
+
+                    // custom fields
+                    Enumeration<String> keys = properties.keys();
+                    while (keys.hasMoreElements()) {
+                        String key = keys.nextElement();
+                        data.put(key, properties.get(key));
+                    }
+
+                    ByteArrayInputStream is = new ByteArrayInputStream(textMessage.getText().getBytes());
+                    data.putAll(unmarshaller.unmarshal(is));
+
+                    data.put("type", "jms");
+                    String karafName = System.getProperty("karaf.name");
+                    if (karafName != null) {
+                        data.put("karafName", karafName);
+                    }
+
+                    Event event = new Event(dispatcherTopic, data);
+                    dispatcher.postEvent(event);
+                } catch (Exception e) {
+                    LOGGER.warn("Can't process JMS message", e);
                 }
-
-                Event event = new Event(dispatcherTopic, data);
-                dispatcher.postEvent(event);
-            } catch (Exception e) {
-                LOGGER.warn("Can't process JMS message", e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/f1409497/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java
----------------------------------------------------------------------
diff --git a/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java b/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java
new file mode 100644
index 0000000..a5e019d
--- /dev/null
+++ b/collector/jms/src/test/java/org/apache/karaf/decanter/collector/jms/JmsCollectorTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.collector.jms;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.karaf.decanter.marshaller.json.JsonUnmarshaller;
+import org.junit.*;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Properties;
+
+public class JmsCollectorTest {
+
+    private static BrokerService broker;
+
+    private DispatcherMock dispatcher;
+    private ActiveMQConnectionFactory connectionFactory;
+
+    @BeforeClass
+    public static void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+        broker.addConnector("tcp://localhost:61616");
+        broker.setUseJmx(false);
+        broker.start();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        connectionFactory = new ActiveMQConnectionFactory();
+        connectionFactory.setBrokerURL("tcp://localhost:61616");
+
+        JsonUnmarshaller unmarshaller = new JsonUnmarshaller();
+
+        dispatcher = new DispatcherMock();
+
+        JmsCollector jmsCollector = new JmsCollector();
+        jmsCollector.setUnmarshaller(unmarshaller);
+        jmsCollector.setConnectionFactory(connectionFactory);
+        jmsCollector.setDispatcher(dispatcher);
+
+        ComponentContext componentContext = new ComponentContextMock();
+        componentContext.getProperties().put("destination.name", "decanter");
+        componentContext.getProperties().put("destination.type", "queue");
+
+        jmsCollector.activate(componentContext);
+    }
+
+    @Test
+    public void test() throws Exception {
+        Connection connection = null;
+        Session session = null;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(session.createQueue("decanter"));
+            ActiveMQMapMessage mapMessage = new ActiveMQMapMessage();
+            mapMessage.setString("message", "map");
+            producer.send(mapMessage);
+
+            Thread.sleep(200L);
+
+            Assert.assertEquals(1, dispatcher.getPostEvents().size());
+            Event event = dispatcher.getPostEvents().get(0);
+            Assert.assertEquals("map", event.getProperty("message"));
+            Assert.assertEquals("jms", event.getProperty("type"));
+
+            ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
+            textMessage.setText("{ \"message\" : \"text\" }");
+            producer.send(textMessage);
+
+            Thread.sleep(200L);
+
+            Assert.assertEquals(2, dispatcher.getPostEvents().size());
+            event = dispatcher.getPostEvents().get(1);
+            Assert.assertEquals("text", event.getProperty("message"));
+            Assert.assertEquals("jms", event.getProperty("type"));
+        } finally {
+            if (session != null) {
+                session.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    @AfterClass
+    public static void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private class ComponentContextMock implements ComponentContext {
+
+        private Properties properties;
+
+        public ComponentContextMock() {
+            this.properties = new Properties();
+        }
+
+        @Override
+        public Dictionary getProperties() {
+            return this.properties;
+        }
+
+        @Override
+        public Object locateService(String s) {
+            return null;
+        }
+
+        @Override
+        public Object locateService(String s, ServiceReference serviceReference) {
+            return null;
+        }
+
+        @Override
+        public Object[] locateServices(String s) {
+            return new Object[0];
+        }
+
+        @Override
+        public BundleContext getBundleContext() {
+            return null;
+        }
+
+        @Override
+        public Bundle getUsingBundle() {
+            return null;
+        }
+
+        @Override
+        public ComponentInstance getComponentInstance() {
+            return null;
+        }
+
+        @Override
+        public void enableComponent(String s) {
+
+        }
+
+        @Override
+        public void disableComponent(String s) {
+
+        }
+
+        @Override
+        public ServiceReference getServiceReference() {
+            return null;
+        }
+    }
+
+    private class DispatcherMock implements EventAdmin {
+
+        private List<Event> postEvents = new ArrayList<>();
+        private List<Event> sendEvents = new ArrayList<>();
+
+        @Override
+        public void postEvent(Event event) {
+            postEvents.add(event);
+        }
+
+        @Override
+        public void sendEvent(Event event) {
+            sendEvents.add(event);
+        }
+
+        public List<Event> getPostEvents() {
+            return postEvents;
+        }
+
+        public List<Event> getSendEvents() {
+            return sendEvents;
+        }
+    }
+
+}


[2/2] karaf-decanter git commit: [KARAF-4706] Add MQTT collector

Posted by jb...@apache.org.
[KARAF-4706] Add MQTT collector


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/7f5cbf44
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/7f5cbf44
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/7f5cbf44

Branch: refs/heads/master
Commit: 7f5cbf44e40b57bcb08928056a4f4388e7d19f42
Parents: f140949
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri Oct 21 08:00:27 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Oct 21 08:04:01 2016 +0200

----------------------------------------------------------------------
 assembly/src/main/feature/feature.xml           |   7 +
 collector/mqtt/LICENSE                          | 475 +++++++++++++++++++
 collector/mqtt/NOTICE                           |  32 ++
 collector/mqtt/pom.xml                          | 103 ++++
 ...org.apache.karaf.decanter.collector.mqtt.cfg |  12 +
 .../decanter/collector/mqtt/MqttCollector.java  | 136 ++++++
 .../collector/mqtt/MqttCollectorTest.java       | 174 +++++++
 collector/pom.xml                               |   1 +
 8 files changed, 940 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/7f5cbf44/assembly/src/main/feature/feature.xml
----------------------------------------------------------------------
diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index 2e460fc..c707b23 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -151,6 +151,13 @@
         <configfile finalname="/etc/org.apache.karaf.decanter.collector.process-jmx.cfg">mvn:org.apache.karaf.decanter.collector/org.apache.karaf.decanter.collector.process/${project.version}/cfg/process</configfile>
     </feature>
 
+    <feature name="decanter-collector-mqtt" version="${project.version}" description="Karaf Decanter MQTT Collector">
+        <feature>decanter-common</feature>
+        <bundle dependency="true">mvn:org.eclipse.paho/org.eclipse.paho.client.mqttv3/${paho.version}</bundle>
+        <bundle>mvn:org.apache.karaf.decanter.collector/org.apache.karaf.decanter.collector.mqtt/${project.version}</bundle>
+        <configfile finalname="/etc/org.apache.karaf.decanter.collector.mqtt.cfg">mvn:org.apache.karaf.decanter.collector/org.apache.karaf.decanter.collector.mqtt/${project.version}/cfg</configfile>
+    </feature>
+
     <feature name="decanter-appender-log" version="${project.version}" description="Karaf Decanter Log Appender">
         <feature>decanter-common</feature>
         <bundle>mvn:org.apache.karaf.decanter.appender/org.apache.karaf.decanter.appender.log/${project.version}</bundle>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/7f5cbf44/collector/mqtt/LICENSE
----------------------------------------------------------------------
diff --git a/collector/mqtt/LICENSE b/collector/mqtt/LICENSE
new file mode 100644
index 0000000..b36a53d
--- /dev/null
+++ b/collector/mqtt/LICENSE
@@ -0,0 +1,475 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+For asm:
+Copyright (c) 2000-2011 INRIA, France Telecom
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+   notice, this list of conditions and the following disclaimer in the
+   documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holders nor the names of its
+   contributors may be used to endorse or promote products derived from
+   this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+THE POSSIBILITY OF SUCH DAMAGE.
+
+For equinox:
+Eclipse Public License - v 1.0
+
+THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE PUBLIC LICENSE ("AGREEMENT").
+ANY USE, REPRODUCTION OR DISTRIBUTION OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
+
+1. DEFINITIONS
+
+"Contribution" means:
+
+a) in the case of the initial Contributor, the initial code and documentation distributed under this Agreement, and
+
+b) in the case of each subsequent Contributor:
+
+i) changes to the Program, and
+
+ii) additions to the Program;
+
+where such changes and/or additions to the Program originate from and are distributed by that particular Contributor.
+A Contribution 'originates' from a Contributor if it was added to the Program by such Contributor itself or anyone
+acting on such Contributor's behalf. Contributions do not include additions to the Program which: (i) are separate
+modules of software distributed in conjunction with the Program under their own license agreement, and (ii) are not
+derivative works of the Program.
+
+"Contributor" means any person or entity that distributes the Program.
+
+"Licensed Patents" mean patent claims licensable by a Contributor which are necessarily infringed by the use or sale
+of its Contribution alone or when combined with the Program.
+
+"Program" means the Contributions distributed in accordance with this Agreement.
+
+"Recipient" means anyone who receives the Program under this Agreement, including all Contributors.
+
+2. GRANT OF RIGHTS
+
+a) Subject to the terms of this Agreement, each Contributor hereby grants Recipient a non-exclusive, worldwide,
+royalty-free copyright license to reproduce, prepare derivative works of, publicly display, publicly perform,
+distribute and sublicense the Contribution of such Contributor, if any, and such derivative works, in source code
+and object code form.
+
+b) Subject to the terms of this Agreement, each Contributor hereby grants Recipient a non-exclusive, worldwide,
+royalty-free patent license under Licensed Patents to make, use, sell, offer to sell, import and otherwise transfer
+the Contribution of such Contributor, if any, in source code and object code form. This patent license shall apply to
+the combination of the Contribution and the Program if, at the time the Contribution is added by the Contributor, such
+addition of the Contribution causes such combination to be covered by the Licensed Patents. The patent license shall
+not apply to any other combinations which include the Contribution. No hardware per se is licensed hereunder.
+
+c) Recipient understands that although each Contributor grants the licenses to its Contributions set forth herein, no
+assurances are provided by any Contributor that the Program does not infringe the patent or other intellectual property
+rights of any other entity. Each Contributor disclaims any liability to Recipient for claims brought by any other entity
+based on infringement of intellectual property rights or otherwise. As a condition to exercising the rights and licenses
+granted hereunder, each Recipient hereby assumes sole responsibility to secure any other intellectual property rights
+needed, if any. For example, if a third party patent license is required to allow Recipient to distribute the Program,
+it is Recipient's responsibility to acquire that license before distributing the Program.
+
+d) Each Contributor represents that to its knowledge it has sufficient copyright rights in its Contribution, if any,
+to grant the copyright license set forth in this Agreement.
+
+3. REQUIREMENTS
+
+A Contributor may choose to distribute the Program in object code form under its own license agreement, provided that:
+
+a) it complies with the terms and conditions of this Agreement; and
+
+b) its license agreement:
+
+i) effectively disclaims on behalf of all Contributors all warranties and conditions, express and implied, including
+warranties or conditions of title and non-infringement, and implied warranties or conditions of merchantability and
+fitness for a particular purpose;
+
+ii) effectively excludes on behalf of all Contributors all liability for damages, including direct, indirect, special,
+incidental and consequential damages, such as lost profits;
+
+iii) states that any provisions which differ from this Agreement are offered by that Contributor alone and not by any
+other party; and
+
+iv) states that source code for the Program is available from such Contributor, and informs licensees how to obtain
+it in a reasonable manner on or through a medium customarily used for software exchange.
+
+When the Program is made available in source code form:
+
+a) it must be made available under this Agreement; and
+
+b) a copy of this Agreement must be included with each copy of the Program.
+
+Contributors may not remove or alter any copyright notices contained within the Program.
+
+Each Contributor must identify itself as the originator of its Contribution, if any, in a manner that reasonably
+allows subsequent Recipients to identify the originator of the Contribution.
+
+4. COMMERCIAL DISTRIBUTION
+
+Commercial distributors of software may accept certain responsibilities with respect to end users, business partners
+and the like. While this license is intended to facilitate the commercial use of the Program, the Contributor who
+includes the Program in a commercial product offering should do so in a manner which does not create potential
+liability for other Contributors. Therefore, if a Contributor includes the Program in a commercial product offering,
+such Contributor ("Commercial Contributor") hereby agrees to defend and indemnify every other Contributor
+("Indemnified Contributor") against any losses, damages and costs (collectively "Losses") arising from claims,
+lawsuits and other legal actions brought by a third party against the Indemnified Contributor to the extent caused
+by the acts or omissions of such Commercial Contributor in connection with its distribution of the Program in a
+commercial product offering. The obligations in this section do not apply to any claims or Losses relating to any
+actual or alleged intellectual property infringement. In order to qualify, an Indemnified Contributor must: a) promptly
+notify the Commercial Contributor in writing of such claim, and b) allow the Commercial Contributor to control, and
+cooperate with the Commercial Contributor in, the defense and any related settlement negotiations. The
+Indemnified Contributor may participate in any such claim at its own expense.
+
+For example, a Contributor might include the Program in a commercial product offering, Product X. That Contributor is
+then a Commercial Contributor. If that Commercial Contributor then makes performance claims, or offers warranties
+related to Product X, those performance claims and warranties are such Commercial Contributor's responsibility alone.
+Under this section, the Commercial Contributor would have to defend claims against the other Contributors related to
+those performance claims and warranties, and if a court requires any other Contributor to pay any damages as a result,
+the Commercial Contributor must pay those damages.
+
+5. NO WARRANTY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR
+CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF TITLE,
+NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely responsible for
+determining the appropriateness of using and distributing the Program and assumes all risks associated with its
+exercise of rights under this Agreement , including but not limited to the risks and costs of program errors,
+compliance with applicable laws, damage to or loss of data, programs or equipment, and unavailability or interruption
+of operations.
+
+6. DISCLAIMER OF LIABILITY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST
+PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS
+GRANTED HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
+
+7. GENERAL
+
+If any provision of this Agreement is invalid or unenforceable under applicable law, it shall not affect the validity
+or enforceability of the remainder of the terms of this Agreement, and without further action by the parties hereto,
+such provision shall be reformed to the minimum extent necessary to make such provision valid and enforceable.
+
+If Recipient institutes patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit)
+alleging that the Program itself (excluding combinations of the Program with other software or hardware) infringes
+such Recipient's patent(s), then such Recipient's rights granted under Section 2(b) shall terminate as of the date
+such litigation is filed.
+
+All Recipient's rights under this Agreement shall terminate if it fails to comply with any of the material terms or
+conditions of this Agreement and does not cure such failure in a reasonable period of time after becoming aware of
+such noncompliance. If all Recipient's rights under this Agreement terminate, Recipient agrees to cease use and
+distribution of the Program as soon as reasonably practicable. However, Recipient's obligations under this Agreement
+and any licenses granted by Recipient relating to the Program shall continue and survive.
+
+Everyone is permitted to copy and distribute copies of this Agreement, but in order to avoid inconsistency the
+Agreement is copyrighted and may only be modified in the following manner. The Agreement Steward reserves the right
+to publish new versions (including revisions) of this Agreement from time to time. No one other than the Agreement
+Steward has the right to modify this Agreement. The Eclipse Foundation is the initial Agreement Steward. The Eclipse
+Foundation may assign the responsibility to serve as the Agreement Steward to a suitable separate entity. Each new
+version of the Agreement will be given a distinguishing version number. The Program (including Contributions) may
+always be distributed subject to the version of the Agreement under which it was received. In addition, after a new
+version of the Agreement is published, Contributor may elect to distribute the Program (including its Contributions)
+under the new version. Except as expressly stated in Sections 2(a) and 2(b) above, Recipient receives no rights or
+licenses to the intellectual property of any Contributor under this Agreement, whether expressly, by implication,
+estoppel or otherwise. All rights in the Program not expressly granted under this Agreement are reserved.
+
+This Agreement is governed by the laws of the State of New York and the intellectual property laws of the United States
+of America. No party to this Agreement will bring a legal action under this Agreement more than one year after the
+cause of action arose. Each party waives its rights to a jury trial in any resulting litigation.
+
+For Java Service Wrapper:
+Copyright (c) 1999, 2006 Tanuki Software, Inc.
+
+Permission is hereby granted, free of charge, to any person
+obtaining a copy of the Java Service Wrapper and associated
+documentation files (the "Software"), to deal in the Software
+without  restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sub-license,
+and/or sell copies of the Software, and to permit persons to
+whom the Software is furnished to do so, subject to the
+following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NON-INFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.
+
+
+Portions of the Software have been derived from source code
+developed by Silver Egg Technology under the following license:
+
+BEGIN Silver Egg Techology License -----------------------------------
+
+    Copyright (c) 2001 Silver Egg Technology
+
+    Permission is hereby granted, free of charge, to any person
+    obtaining a copy of this software and associated documentation
+    files (the "Software"), to deal in the Software without
+    restriction, including without limitation the rights to use,
+    copy, modify, merge, publish, distribute, sub-license, and/or
+    sell copies of the Software, and to permit persons to whom the
+    Software is furnished to do so, subject to the following
+    conditions:
+
+    The above copyright notice and this permission notice shall be
+    included in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+    EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+    OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+    NON-INFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+    HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+    WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+    OTHER DEALINGS IN THE SOFTWARE.
+
+END Silver Egg Techology License -------------------------------------
+
+For slf4j
+Copyright (c) 2004-2011 QOS.ch
+All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/7f5cbf44/collector/mqtt/NOTICE
----------------------------------------------------------------------
diff --git a/collector/mqtt/NOTICE b/collector/mqtt/NOTICE
new file mode 100644
index 0000000..3cf0a31
--- /dev/null
+++ b/collector/mqtt/NOTICE
@@ -0,0 +1,32 @@
+Apache Karaf Decanter
+Copyright 2015 The Apache Software Foundation
+
+I. Included Software
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+Licensed under the Apache License 2.0.
+
+II. Used Software
+
+This product uses software developed at
+The OSGi Alliance (http://www.osgi.org/).
+Copyright (c) OSGi Alliance (2000, 2010).
+Licensed under the Apache License 2.0.
+
+This product uses software developed at
+OPS4J (http://www.ops4j.org/).
+Licensed under the Apache License 2.0.
+
+This product uses software developed at
+SLF4J (http://www.slf4j.org/).
+Licensed under the MIT License.
+
+This product uses software developed at
+JUnit (http://www.junit.org/).
+Licensed under the Eclipse Public License 1.0.
+
+III. License Summary
+- Apache License 2.0
+- MIT License
+- Eclipse Public License 1.0

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/7f5cbf44/collector/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/collector/mqtt/pom.xml b/collector/mqtt/pom.xml
new file mode 100644
index 0000000..d9e2fee
--- /dev/null
+++ b/collector/mqtt/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <!--
+
+        Licensed to the Apache Software Foundation (ASF) under one or more
+        contributor license agreements.  See the NOTICE file distributed with
+        this work for additional information regarding copyright ownership.
+        The ASF licenses this file to You under the Apache License, Version 2.0
+        (the "License"); you may not use this file except in compliance with
+        the License.  You may obtain a copy of the License at
+
+           http://www.apache.org/licenses/LICENSE-2.0
+
+        Unless required by applicable law or agreed to in writing, software
+        distributed under the License is distributed on an "AS IS" BASIS,
+        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        See the License for the specific language governing permissions and
+        limitations under the License.
+    -->
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.karaf.decanter</groupId>
+        <artifactId>collector</artifactId>
+        <version>1.2.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.karaf.decanter.collector</groupId>
+    <artifactId>org.apache.karaf.decanter.collector.mqtt</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache Karaf :: Decanter :: Collector :: MQTT</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.karaf.decanter</groupId>
+            <artifactId>org.apache.karaf.decanter.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.0.2</version>
+        </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-mqtt</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-kahadb-store</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.marshaller</groupId>
+            <artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>src/main/cfg/org.apache.karaf.decanter.collector.mqtt.cfg</file>
+                                    <type>cfg</type>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/7f5cbf44/collector/mqtt/src/main/cfg/org.apache.karaf.decanter.collector.mqtt.cfg
----------------------------------------------------------------------
diff --git a/collector/mqtt/src/main/cfg/org.apache.karaf.decanter.collector.mqtt.cfg b/collector/mqtt/src/main/cfg/org.apache.karaf.decanter.collector.mqtt.cfg
new file mode 100644
index 0000000..e31efec
--- /dev/null
+++ b/collector/mqtt/src/main/cfg/org.apache.karaf.decanter.collector.mqtt.cfg
@@ -0,0 +1,12 @@
+#######################################
+# Decanter MQTT Collector Configuration
+#######################################
+
+# URI of the MQTT broker
+server.uri=tcp://localhost:61616
+
+# MQTT Client ID
+client.id=decanter
+
+# MQTT topic name
+topic=decanter

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/7f5cbf44/collector/mqtt/src/main/java/org/apache/karaf/decanter/collector/mqtt/MqttCollector.java
----------------------------------------------------------------------
diff --git a/collector/mqtt/src/main/java/org/apache/karaf/decanter/collector/mqtt/MqttCollector.java b/collector/mqtt/src/main/java/org/apache/karaf/decanter/collector/mqtt/MqttCollector.java
new file mode 100644
index 0000000..6dc39b2
--- /dev/null
+++ b/collector/mqtt/src/main/java/org/apache/karaf/decanter/collector/mqtt/MqttCollector.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.collector.mqtt;
+
+import org.apache.karaf.decanter.api.marshaller.Unmarshaller;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.net.InetAddress;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+@Component(
+    name = "org.apache.karaf.decanter.collector.mqtt",
+    immediate = true
+)
+public class MqttCollector {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MqttCollector.class);
+
+    private Dictionary<String, Object> properties;
+
+    private MqttClient client;
+    private String dispatcherTopic;
+    private boolean consuming = false;
+
+    private EventAdmin dispatcher;
+    private Unmarshaller unmarshaller;
+
+    @Activate
+    public void activate(ComponentContext componentContext) throws Exception {
+        properties = componentContext.getProperties();
+        String serverUri =  getProperty(properties, "server.uri", "tcp://localhost:61616");
+        String clientId = getProperty(properties, "client.id", "decanter");
+        String topic = getProperty(properties, "topic", "decanter");
+        dispatcherTopic = getProperty(properties, EventConstants.EVENT_TOPIC, "decanter/collect/mqtt/decanter");
+        client = new MqttClient(serverUri, clientId);
+        client.connect();
+        client.subscribe(topic);
+        client.setCallback(new MqttCallback() {
+            @Override
+            public void connectionLost(Throwable cause) {
+                LOGGER.debug("MQTT connection lost", cause);
+            }
+
+            @Override
+            public void messageArrived(String topic, MqttMessage message) {
+                if (message.getPayload() == null) {
+                    return;
+                }
+
+                Map<String, Object> data = new HashMap<>();
+                try {
+                    data.put("hostAddress", InetAddress.getLocalHost().getHostAddress());
+                    data.put("hostName", InetAddress.getLocalHost().getHostName());
+                } catch (Exception e) {
+                    LOGGER.warn("Can't populate local host name and address", e);
+                }
+
+                // custom fields
+                Enumeration<String> keys = properties.keys();
+                while (keys.hasMoreElements()) {
+                    String key = keys.nextElement();
+                    data.put(key, properties.get(key));
+                }
+
+                ByteArrayInputStream is = new ByteArrayInputStream(message.getPayload());
+                data.putAll(unmarshaller.unmarshal(is));
+
+                data.put("type", "mqtt");
+                String karafName = System.getProperty("karaf.name");
+                if (karafName != null) {
+                    data.put("karafName", karafName);
+                }
+                Event event = new Event(dispatcherTopic, data);
+                dispatcher.postEvent(event);
+            }
+
+            @Override
+            public void deliveryComplete(IMqttDeliveryToken token) {
+                // nothing to do
+            }
+        });
+    }
+
+    @Deactivate
+    public void deactivate() throws Exception {
+        if (client != null) {
+            client.disconnect();
+        }
+    }
+
+    private String getProperty(Dictionary<String, Object> properties, String key, String defaultValue) {
+        return (properties.get(key) != null) ? (String) properties.get(key) : defaultValue;
+    }
+
+    @Reference
+    public void setDispatcher(EventAdmin dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    @Reference
+    public void setUnmarshaller(Unmarshaller unmarshaller) {
+        this.unmarshaller = unmarshaller;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/7f5cbf44/collector/mqtt/src/test/java/org/apache/karaf/decanter/collector/mqtt/MqttCollectorTest.java
----------------------------------------------------------------------
diff --git a/collector/mqtt/src/test/java/org/apache/karaf/decanter/collector/mqtt/MqttCollectorTest.java b/collector/mqtt/src/test/java/org/apache/karaf/decanter/collector/mqtt/MqttCollectorTest.java
new file mode 100644
index 0000000..c3e0869
--- /dev/null
+++ b/collector/mqtt/src/test/java/org/apache/karaf/decanter/collector/mqtt/MqttCollectorTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.collector.mqtt;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.karaf.decanter.marshaller.json.JsonUnmarshaller;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+public class MqttCollectorTest {
+
+    private static BrokerService broker;
+
+    @BeforeClass
+    public static void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+        broker.addConnector(new URI("mqtt://localhost:11883"));
+        broker.start();
+    }
+
+    @Test
+    public void sendDecanterMessage() throws Exception {
+        DispatcherMock dispatcherMock = new DispatcherMock();
+        ComponentContext componentContext = new ComponentContextMock();
+        componentContext.getProperties().put("server.uri", "tcp://localhost:11883");
+        componentContext.getProperties().put("client.id", "decanter");
+        componentContext.getProperties().put("topic", "decanter");
+        JsonUnmarshaller unmarshaller = new JsonUnmarshaller();
+        MqttCollector collector = new MqttCollector();
+        collector.setDispatcher(dispatcherMock);
+        collector.setUnmarshaller(unmarshaller);
+        collector.activate(componentContext);
+
+        MqttClient mqttClient = new MqttClient("tcp://localhost:11883", "client");
+        mqttClient.connect();
+        MqttMessage message = new MqttMessage();
+        message.setPayload("{ \"foo\" : \"bar\" }".getBytes());
+        mqttClient.publish("decanter", message);
+        mqttClient.disconnect();
+
+        Thread.sleep(200L);
+
+        Assert.assertEquals(1, dispatcherMock.getPostEvents().size());
+        Event event = dispatcherMock.getPostEvents().get(0);
+        Assert.assertEquals("bar", event.getProperty("foo"));
+        Assert.assertEquals("mqtt", event.getProperty("type"));
+    }
+
+    @AfterClass
+    public static void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private static class ComponentContextMock implements ComponentContext {
+
+        private Dictionary properties = new Hashtable<>();
+
+        @Override
+        public Dictionary getProperties() {
+            return properties;
+        }
+
+        @Override
+        public Object locateService(String name) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public Object locateService(String name, ServiceReference reference) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public Object[] locateServices(String name) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public BundleContext getBundleContext() {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public Bundle getUsingBundle() {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public ComponentInstance getComponentInstance() {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public void enableComponent(String name) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public void disableComponent(String name) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public ServiceReference getServiceReference() {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+    }
+
+    private class DispatcherMock implements EventAdmin {
+        private List<Event> postEvents = new ArrayList<>();
+        private List<Event> sendEvents = new ArrayList<>();
+
+        @Override
+        public void postEvent(Event event) {
+            postEvents.add(event);
+        }
+
+        @Override
+        public void sendEvent(Event event) {
+            sendEvents.add(event);
+        }
+
+        public List<Event> getPostEvents() {
+            return postEvents;
+        }
+
+        public List<Event> getSendEvents() {
+            return sendEvents;
+        }
+
+        public void reset() {
+            postEvents.clear();
+            sendEvents.clear();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/7f5cbf44/collector/pom.xml
----------------------------------------------------------------------
diff --git a/collector/pom.xml b/collector/pom.xml
index cf4614f..0c75412 100644
--- a/collector/pom.xml
+++ b/collector/pom.xml
@@ -47,6 +47,7 @@
         <module>jms</module>
         <module>process</module>
         <module>socket</module>
+        <module>mqtt</module>
     </modules>
 
 </project>