You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/10/02 12:28:21 UTC

[plc4x] branch develop updated (d2128c1 -> 987c360)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git.


    from d2128c1  - Migrated the logstash plugin to use the shade plugin instead of the assembly plugin
     new 34995ac  - Bumped the version of the build-tools code-generation to the just released version - Bumped the versions of the calcite, camel, karaf versions to the latest versions
     new 987c360  - Continued working on the BACnetIP driver - Added a StreamPipes module for using the BACnetIP driver as source in StreamPipes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4j/integrations/apache-calcite/pom.xml          |   2 +-
 plc4j/integrations/apache-camel/pom.xml            |   2 +-
 plc4j/integrations/apache-karaf/pom.xml            |   2 +-
 pom.xml                                            |  16 +-
 sandbox/pom.xml                                    |   2 +
 sandbox/streampipes-connectors/pom.xml             | 187 +++++++++++++++++++
 .../java/streampipes/bacnetip/BacNetIpAdapter.java | 207 +++++++++++++++++++++
 .../streampipes/bacnetip/config/ConfigKeys.java    |  30 +++
 .../bacnetip/config/ConnectWorkerConfig.java       |  84 +++++++++
 sandbox/test-java-bacnetip-driver/pom.xml          |   4 +-
 .../plc4x/java/bacnetip/PassiveBacNetIpDriver.java |   3 +-
 .../connection/PassiveBacNetIpPlcConnection.java   |  11 +-
 .../java/bacnetip/PassiveBacNetIpDriverManual.java |   3 +-
 13 files changed, 536 insertions(+), 17 deletions(-)
 create mode 100644 sandbox/streampipes-connectors/pom.xml
 create mode 100644 sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/BacNetIpAdapter.java
 create mode 100644 sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/config/ConfigKeys.java
 create mode 100644 sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/config/ConnectWorkerConfig.java


[plc4x] 01/02: - Bumped the version of the build-tools code-generation to the just released version - Bumped the versions of the calcite, camel, karaf versions to the latest versions

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 34995ac707f479fedc3e45077cc74a0727c3f23f
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Wed Oct 2 14:27:04 2019 +0200

    - Bumped the version of the build-tools code-generation to the just released version
    - Bumped the versions of the calcite, camel, karaf versions to the latest versions
---
 plc4j/integrations/apache-calcite/pom.xml |  2 +-
 plc4j/integrations/apache-camel/pom.xml   |  2 +-
 plc4j/integrations/apache-karaf/pom.xml   |  2 +-
 pom.xml                                   | 16 ++++++++++------
 4 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/plc4j/integrations/apache-calcite/pom.xml b/plc4j/integrations/apache-calcite/pom.xml
index b0cce04..3872324 100644
--- a/plc4j/integrations/apache-calcite/pom.xml
+++ b/plc4j/integrations/apache-calcite/pom.xml
@@ -31,7 +31,7 @@
   <name>PLC4J: Integrations: Apache Calcite</name>
 
   <properties>
-    <calcite-core.version>1.17.0</calcite-core.version>
+    <calcite-core.version>1.21.0</calcite-core.version>
   </properties>
 
   <dependencies>
diff --git a/plc4j/integrations/apache-camel/pom.xml b/plc4j/integrations/apache-camel/pom.xml
index 176af17..3d1847a 100644
--- a/plc4j/integrations/apache-camel/pom.xml
+++ b/plc4j/integrations/apache-camel/pom.xml
@@ -33,7 +33,7 @@
   <description>Integration module for integrating PLC4X into Apache Camel.</description>
 
   <properties>
-    <camel.version>2.23.1</camel.version>
+    <camel.version>2.24.2</camel.version>
   </properties>
 
   <dependencies>
diff --git a/plc4j/integrations/apache-karaf/pom.xml b/plc4j/integrations/apache-karaf/pom.xml
index b3b5227..093431b 100644
--- a/plc4j/integrations/apache-karaf/pom.xml
+++ b/plc4j/integrations/apache-karaf/pom.xml
@@ -35,7 +35,7 @@
   <description>Integration module for integrating PLC4X into Apache Karaf.</description>
 
   <properties>
-    <karaf.version>4.2.2</karaf.version>
+    <karaf.version>4.2.7</karaf.version>
     <karaf-maven-plugin.version>${karaf.version}</karaf-maven-plugin.version>
   </properties>
 
diff --git a/pom.xml b/pom.xml
index 2d99014..52950de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,9 +102,7 @@
     <!-- Exclude all generated code -->
     <sonar.exclusions>**/generated-sources</sonar.exclusions>
 
-
-    <!--plc4x-code-generation.version>1.0.0</plc4x-code-generation.version-->
-    <plc4x-code-generation.version>1.0.0-SNAPSHOT</plc4x-code-generation.version>
+    <plc4x-code-generation.version>1.0.0</plc4x-code-generation.version>
 
     <antlr.version>4.7.2</antlr.version>
     <asm.version>5.0.4</asm.version>
@@ -126,7 +124,7 @@
     <commons-pool2.version>2.6.0</commons-pool2.version>
     <commons-text.version>1.8</commons-text.version>
     <crc.version>1.0.1</crc.version>
-    <elasticsearch.version>7.2.0</elasticsearch.version>
+    <elasticsearch.version>7.4.0</elasticsearch.version>
     <equalsverifier.version>3.0.2</equalsverifier.version>
     <findbugs.version>3.0.1</findbugs.version>
     <freemarker.version>2.3.28</freemarker.version>
@@ -134,6 +132,7 @@
     <gson.version>2.8.5</gson.version>
     <guava.version>27.0.1-jre</guava.version>
     <hamcrest.version>1.3</hamcrest.version>
+    <httpclient.version>4.5.10</httpclient.version>
     <jackson.version>2.9.9</jackson.version>
     <jmh.version>1.21</jmh.version>
     <jna.version>5.3.1</jna.version>
@@ -145,7 +144,7 @@
     <junit.version>4.12</junit.version>
     <log4j.version>2.11.1</log4j.version>
     <logback.version>1.2.3</logback.version>
-    <logstash.version>7.3.0</logstash.version>
+    <logstash.version>7.4.0</logstash.version>
     <lucene.version>8.0.0</lucene.version>
     <metrics-core.version>3.1.2</metrics-core.version>
     <mockito.version>2.24.5</mockito.version>
@@ -329,6 +328,11 @@
         <version>${netty.version}</version>
       </dependency>
       <dependency>
+        <groupId>javax.annotation</groupId>
+        <artifactId>javax.annotation-api</artifactId>
+        <version>1.3.2</version>
+      </dependency>
+      <dependency>
         <groupId>javax.xml.bind</groupId>
         <artifactId>jaxb-api</artifactId>
         <version>2.3.1</version>
@@ -422,7 +426,7 @@
       <dependency>
         <groupId>org.apache.httpcomponents</groupId>
         <artifactId>httpcore</artifactId>
-        <version>4.4.10</version>
+        <version>4.4.12</version>
       </dependency>
       <dependency>
         <groupId>org.apache.logging.log4j</groupId>


[plc4x] 02/02: - Continued working on the BACnetIP driver - Added a StreamPipes module for using the BACnetIP driver as source in StreamPipes

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 987c360568d76fe1041e2cf0e3d8fb83cf221f01
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Wed Oct 2 14:28:14 2019 +0200

    - Continued working on the BACnetIP driver
    - Added a StreamPipes module for using the BACnetIP driver as source in StreamPipes
---
 sandbox/pom.xml                                    |   2 +
 sandbox/streampipes-connectors/pom.xml             | 187 +++++++++++++++++++
 .../java/streampipes/bacnetip/BacNetIpAdapter.java | 207 +++++++++++++++++++++
 .../streampipes/bacnetip/config/ConfigKeys.java    |  30 +++
 .../bacnetip/config/ConnectWorkerConfig.java       |  84 +++++++++
 sandbox/test-java-bacnetip-driver/pom.xml          |   4 +-
 .../plc4x/java/bacnetip/PassiveBacNetIpDriver.java |   3 +-
 .../connection/PassiveBacNetIpPlcConnection.java   |  11 +-
 .../java/bacnetip/PassiveBacNetIpDriverManual.java |   3 +-
 9 files changed, 523 insertions(+), 8 deletions(-)

diff --git a/sandbox/pom.xml b/sandbox/pom.xml
index 72d06a2..60a9392 100644
--- a/sandbox/pom.xml
+++ b/sandbox/pom.xml
@@ -36,6 +36,8 @@
 
   <modules>
     <module>code-gen</module>
+    <module>streampipes-connectors</module>
+
     <module>test-java-ab-eth-driver</module>
     <module>test-java-bacnetip-driver</module>
     <module>test-java-knxnetip-driver</module>
diff --git a/sandbox/streampipes-connectors/pom.xml b/sandbox/streampipes-connectors/pom.xml
new file mode 100644
index 0000000..1e0ad2d
--- /dev/null
+++ b/sandbox/streampipes-connectors/pom.xml
@@ -0,0 +1,187 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.plc4x.sandbox</groupId>
+    <artifactId>plc4x-sandbox</artifactId>
+    <version>0.5.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>test-streampipes-connectors</artifactId>
+
+  <name>Sandbox: StreamPipes Connectors</name>
+
+  <properties>
+    <streampipes.version>0.64.0</streampipes.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-protocol-driver-base</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-protocol-driver-base-raw-socket</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-protocol-driver-base-pcap-socket</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+      <!--scope>test</scope-->
+    </dependency>
+
+    <dependency>
+      <groupId>org.streampipes</groupId>
+      <artifactId>streampipes-connect-container-worker</artifactId>
+      <version>${streampipes.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.jboss.spec.javax.annotation</groupId>
+          <artifactId>jboss-annotations-api_1.2_spec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.hk2.external</groupId>
+          <artifactId>javax.inject</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish.hk2.external</groupId>
+          <artifactId>aopalliance-repackaged</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>aopalliance</groupId>
+          <artifactId>aopalliance</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.glassfish</groupId>
+          <artifactId>javax.el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.ow2.spec.ee</groupId>
+          <artifactId>ow2-jpa-1.0-spec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>jsr305</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.plc4x.sandbox</groupId>
+      <artifactId>test-java-bacnetip-driver</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>commons-beanutils</groupId>
+        <artifactId>commons-beanutils</artifactId>
+        <version>1.9.4</version>
+      </dependency>
+
+      <dependency>
+        <groupId>com.fasterxml.jackson.datatype</groupId>
+        <artifactId>jackson-datatype-jdk8</artifactId>
+        <version>2.9.9</version>
+      </dependency>
+
+      <dependency>
+        <groupId>com.squareup.okhttp3</groupId>
+        <artifactId>okhttp</artifactId>
+        <version>3.12.1</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>fluent-hc</artifactId>
+        <version>${httpclient.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpclient</artifactId>
+        <version>${httpclient.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.javassist</groupId>
+        <artifactId>javassist</artifactId>
+        <version>3.25.0-GA</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.jboss.logging</groupId>
+        <artifactId>jboss-logging</artifactId>
+        <version>3.3.2.Final</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.objenesis</groupId>
+        <artifactId>objenesis</artifactId>
+        <version>2.5.1</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.springframework</groupId>
+        <artifactId>spring-aop</artifactId>
+        <version>5.1.9.RELEASE</version>
+      </dependency>
+      <dependency>
+        <groupId>org.springframework</groupId>
+        <artifactId>spring-beans</artifactId>
+        <version>5.1.9.RELEASE</version>
+      </dependency>
+      <dependency>
+        <groupId>org.springframework</groupId>
+        <artifactId>spring-context</artifactId>
+        <version>5.1.9.RELEASE</version>
+      </dependency>
+      <dependency>
+        <groupId>org.springframework</groupId>
+        <artifactId>spring-core</artifactId>
+        <version>5.1.9.RELEASE</version>
+      </dependency>
+      <dependency>
+        <groupId>org.springframework</groupId>
+        <artifactId>spring-web</artifactId>
+        <version>5.1.9.RELEASE</version>
+      </dependency>
+
+      <dependency>
+        <groupId>javax.validation</groupId>
+        <artifactId>validation-api</artifactId>
+        <version>1.1.0.Final</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+</project>
diff --git a/sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/BacNetIpAdapter.java b/sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/BacNetIpAdapter.java
new file mode 100644
index 0000000..0068dee
--- /dev/null
+++ b/sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/BacNetIpAdapter.java
@@ -0,0 +1,207 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.streampipes.bacnetip;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.bacnetip.PassiveBacNetIpDriver;
+import org.apache.plc4x.java.bacnetip.connection.PassiveBacNetIpPlcConnection;
+import org.apache.plc4x.java.bacnetip.readwrite.*;
+import org.apache.plc4x.java.base.PlcMessageToMessageCodec;
+import org.apache.plc4x.java.base.connection.NettyPlcConnection;
+import org.apache.plc4x.java.base.connection.PcapChannelFactory;
+import org.apache.plc4x.java.base.messages.PlcRequestContainer;
+import org.apache.plc4x.java.streampipes.bacnetip.config.ConnectWorkerConfig;
+import org.apache.plc4x.java.utils.pcapsockets.netty.PcapSocketAddress;
+import org.apache.plc4x.java.utils.pcapsockets.netty.PcapSocketChannelConfig;
+import org.apache.plc4x.java.utils.pcapsockets.netty.UdpIpPacketHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.streampipes.connect.adapter.Adapter;
+import org.streampipes.connect.adapter.exception.AdapterException;
+import org.streampipes.connect.adapter.exception.ParseException;
+import org.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
+import org.streampipes.connect.container.worker.init.AdapterWorkerContainer;
+import org.streampipes.connect.init.AdapterDeclarerSingleton;
+import org.streampipes.model.AdapterType;
+import org.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
+import org.streampipes.model.connect.guess.GuessSchema;
+import org.streampipes.model.schema.EventProperty;
+import org.streampipes.model.schema.EventSchema;
+import org.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.streampipes.sdk.builder.adapter.SpecificDataStreamAdapterBuilder;
+import org.streampipes.sdk.utils.Datatypes;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class BacNetIpAdapter extends SpecificDataStreamAdapter {
+
+    public static final String ID = "http://plc4x.apache.org/streampipes/adapter/bacnetip";
+    private static final Logger logger = LoggerFactory.getLogger(BacNetIpAdapter.class);
+
+    private NettyPlcConnection connection;
+
+    private Map<String, Object> event;
+    private int numberProperties;
+
+    public BacNetIpAdapter() {
+        event = new HashMap<>();
+        numberProperties = 0;
+    }
+
+    public BacNetIpAdapter(SpecificAdapterStreamDescription adapterDescription) {
+        super(adapterDescription);
+        event = new HashMap<>();
+        numberProperties = 0;
+    }
+
+    @Override
+    public SpecificAdapterStreamDescription declareModel() {
+        SpecificAdapterStreamDescription description = SpecificDataStreamAdapterBuilder.create(ID, "BACnet/IP", "")
+            .iconUrl("bacnetip.png")
+            .category(AdapterType.Manufacturing)
+            .build();
+        description.setAppId(ID);
+        return description;
+    }
+
+    @Override
+    public GuessSchema getSchema(SpecificAdapterStreamDescription specificAdapterStreamDescription) throws AdapterException, ParseException {
+        EventSchema eventSchema = new EventSchema();
+        List<EventProperty> allProperties = new ArrayList<>();
+
+        allProperties.add(
+            PrimitivePropertyBuilder
+                .create(Datatypes.String, "sourceId")
+                .label("Source Id")
+                .description("")
+                .build());
+
+        allProperties.add(
+            PrimitivePropertyBuilder
+                .create(Datatypes.String, "propertyId")
+                .label("Property Id")
+                .description("")
+                .build());
+
+        // We need to define the type of the value, I choose a numerical value
+        allProperties.add(
+            PrimitivePropertyBuilder
+                .create(Datatypes.Float, "value")
+                .label("Value")
+                .description("")
+                .build());
+
+        eventSchema.setEventProperties(allProperties);
+
+        GuessSchema guessSchema = new GuessSchema();
+        guessSchema.setEventSchema(eventSchema);
+        guessSchema.setPropertyProbabilityList(new ArrayList<>());
+
+        return guessSchema;
+    }
+
+    @Override
+    public void startAdapter() throws AdapterException {
+        try {
+            connection = new PassiveBacNetIpPlcConnection(new PcapChannelFactory(
+                //new File("/Users/christofer.dutz/Projects/Apache/PLC4X-Documents/BacNET/Captures/Merck/BACnetWhoIsRouterToNetwork.pcapng"), null,
+                new File("/Users/christofer.dutz/Downloads/20190906_udp.pcapng"), null,
+                PassiveBacNetIpDriver.BACNET_IP_PORT, PcapSocketAddress.ALL_PROTOCOLS,
+                PcapSocketChannelConfig.NO_THROTTLING, new UdpIpPacketHandler()), "",
+                new PlcMessageToMessageCodec<BVLC, PlcRequestContainer>() {
+
+                @Override
+                protected void decode(ChannelHandlerContext channelHandlerContext, BVLC packet, List<Object> list) throws Exception {
+                    final NPDU npdu = ((BVLCOriginalUnicastNPDU) packet).getNpdu();
+                    final APDU apdu = npdu.getApdu();
+                    if(apdu instanceof APDUConfirmedRequest) {
+                        APDUConfirmedRequest request = (APDUConfirmedRequest) apdu;
+                        final BACnetConfirmedServiceRequest serviceRequest = request.getServiceRequest();
+                        if(serviceRequest instanceof BACnetConfirmedServiceRequestConfirmedCOVNotification) {
+                            BACnetConfirmedServiceRequestConfirmedCOVNotification covNotification = (BACnetConfirmedServiceRequestConfirmedCOVNotification) serviceRequest;
+                            final BACnetTagWithContent[] notifications = covNotification.getNotifications();
+
+                            // TODO: Get the information from the decoded packet.
+                            String key = ""; // Node-id + property-id
+                            Float value = 1.0f; // Value
+
+                            event.put(key, value);
+                            if (event.keySet().size() >= numberProperties) {
+                                adapterPipeline.process(event);
+                            }
+
+                            System.out.println("Simple-ACK(" + request.getInvokeId() + "): Confirmed COV Notification [" + notifications.length + "]");
+                        }
+                    }
+                }
+
+                @Override
+                protected void encode(ChannelHandlerContext ctx, PlcRequestContainer msg, List<Object> out) throws Exception {
+                    // Ignore this as we don't send anything.
+                }
+            });
+            connection.connect();
+        } catch (PlcConnectionException e) {
+            logger.error("An error occurred starting the BACnet/IP driver", e);
+            throw new AdapterException("An error occurred starting the BACnet/IP driver");
+        }
+    }
+
+    @Override
+    public void stopAdapter() throws AdapterException {
+        if(connection != null) {
+            try {
+                connection.close();
+            } catch (PlcConnectionException e) {
+                logger.error("An error occurred stopping the BACnet/IP driver", e);
+                throw new AdapterException("An error occurred stopping the BACnet/IP driver");
+            }
+        }
+    }
+
+    @Override
+    public Adapter getInstance(SpecificAdapterStreamDescription specificAdapterStreamDescription) {
+        return new BacNetIpAdapter(adapterDescription);
+    }
+
+    @Override
+    public String getId() {
+        return ID;
+    }
+
+    public static class BacNetIpAdapterInit extends AdapterWorkerContainer {
+        public static void main(String[] args) {
+            AdapterDeclarerSingleton
+                .getInstance()
+                .add(new BacNetIpAdapter());
+
+            String workerUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerUrl();
+            String masterUrl = ConnectWorkerConfig.INSTANCE.getConnectContainerMasterUrl();
+            Integer workerPort = ConnectWorkerConfig.INSTANCE.getConnectContainerWorkerPort();
+
+            new BacNetIpAdapterInit().init(workerUrl, masterUrl, workerPort);
+        }
+    }
+
+}
diff --git a/sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/config/ConfigKeys.java b/sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/config/ConfigKeys.java
new file mode 100644
index 0000000..ccdf39b
--- /dev/null
+++ b/sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/config/ConfigKeys.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019 FZI Forschungszentrum Informatik
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.plc4x.java.streampipes.bacnetip.config;
+
+public class ConfigKeys {
+    final static String KAFKA_HOST = "SP_KAFKA_HOST";
+    final static String KAFKA_PORT = "SP_KAFKA_PORT";
+
+    final static String CONNECT_CONTAINER_WORKER_HOST = "SP_CONNECT_CONTAINER_WORKER_HOST";
+    final static String CONNECT_CONTAINER_WORKER_PORT = "SP_CONNECT_CONTAINER_WORKER_PORT";
+
+    final static String CONNECT_CONTAINER_MASTER_HOST = "SP_CONNECT_CONTAINER_MASTER_HOST";
+    final static String CONNECT_CONTAINER_MASTER_PORT = "SP_CONNECT_CONTAINER_MASTER_PORT";
+
+}
diff --git a/sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/config/ConnectWorkerConfig.java b/sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/config/ConnectWorkerConfig.java
new file mode 100644
index 0000000..64357f0
--- /dev/null
+++ b/sandbox/streampipes-connectors/src/main/java/org/apache/plc4x/java/streampipes/bacnetip/config/ConnectWorkerConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2019 FZI Forschungszentrum Informatik
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.plc4x.java.streampipes.bacnetip.config;
+
+import org.streampipes.config.SpConfig;
+import org.streampipes.connect.init.Config;
+
+public enum ConnectWorkerConfig {
+
+    INSTANCE;
+
+    private SpConfig config;
+
+    ConnectWorkerConfig() {
+        String name = "bacnetip-connect-worker-main";
+        config = SpConfig.getSpConfig(name);
+
+        config.register(ConfigKeys.KAFKA_HOST, "kafka", "Hostname for backend service for kafka");
+        config.register(ConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
+
+        config.register(ConfigKeys.CONNECT_CONTAINER_WORKER_PORT, Config.WORKER_PORT, "The port of the connect container");
+        config.register(ConfigKeys.CONNECT_CONTAINER_WORKER_HOST, name, "The hostname of the connect container");
+
+        config.register(ConfigKeys.CONNECT_CONTAINER_MASTER_PORT, Config.MASTER_PORT, "The port of the connect container");
+        config.register(ConfigKeys.CONNECT_CONTAINER_MASTER_HOST, Config.MASTER_HOST, "The hostname of the connect container");
+
+    }
+
+    public String getConnectContainerWorkerUrl() {
+        return "http://" + config.getString(ConfigKeys.CONNECT_CONTAINER_WORKER_HOST) + ":" + config.getInteger(ConfigKeys.CONNECT_CONTAINER_WORKER_PORT) + "/";
+    }
+
+    public String getConnectContainerMasterUrl() {
+        return "http://" + config.getString(ConfigKeys.CONNECT_CONTAINER_MASTER_HOST) + ":" + config.getInteger(ConfigKeys.CONNECT_CONTAINER_MASTER_PORT) + "/";
+    }
+
+    public String getKafkaHost() {
+        return config.getString(ConfigKeys.KAFKA_HOST);
+    }
+
+    public void setKafkaHost(String s) {
+        config.setString(ConfigKeys.KAFKA_HOST, s);
+    }
+
+    public int getKafkaPort() {
+        return config.getInteger(ConfigKeys.KAFKA_PORT);
+    }
+
+    public String getKafkaUrl() {
+        return getKafkaHost() + ":" + getKafkaPort();
+    }
+
+    public String getConnectContainerWorkerHost() {
+        return config.getString(ConfigKeys.CONNECT_CONTAINER_WORKER_HOST);
+    }
+
+    public Integer getConnectContainerWorkerPort() {
+        return config.getInteger(ConfigKeys.CONNECT_CONTAINER_WORKER_PORT);
+    }
+
+    public String getConnectContainerMasterHost() {
+        return config.getString(ConfigKeys.CONNECT_CONTAINER_MASTER_HOST);
+    }
+
+    public Integer getConnectContainerMasterPort() {
+        return config.getInteger(ConfigKeys.CONNECT_CONTAINER_MASTER_PORT);
+    }
+
+}
diff --git a/sandbox/test-java-bacnetip-driver/pom.xml b/sandbox/test-java-bacnetip-driver/pom.xml
index 793ac24..137fae5 100644
--- a/sandbox/test-java-bacnetip-driver/pom.xml
+++ b/sandbox/test-java-bacnetip-driver/pom.xml
@@ -56,7 +56,6 @@
   </build>
 
   <dependencies>
-
     <dependency>
       <groupId>org.apache.plc4x</groupId>
       <artifactId>plc4j-api</artifactId>
@@ -77,6 +76,7 @@
       <artifactId>plc4j-protocol-driver-base-raw-socket</artifactId>
       <version>0.5.0-SNAPSHOT</version>
     </dependency>
+
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
@@ -92,7 +92,7 @@
       <groupId>org.apache.plc4x</groupId>
       <artifactId>plc4j-protocol-driver-base-pcap-socket</artifactId>
       <version>0.5.0-SNAPSHOT</version>
-      <scope>test</scope>
+      <!--scope>test</scope-->
     </dependency>
     <dependency>
       <groupId>ch.qos.logback</groupId>
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java
index 48916f8..4041640 100644
--- a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriver.java
@@ -22,6 +22,7 @@ import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.authentication.PlcAuthentication;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.bacnetip.connection.PassiveBacNetIpPlcConnection;
+import org.apache.plc4x.java.bacnetip.protocol.HelloWorldProtocol;
 import org.apache.plc4x.java.spi.PlcDriver;
 import org.apache.plc4x.java.utils.rawsockets.netty.RawSocketIpAddress;
 
@@ -61,7 +62,7 @@ public class PassiveBacNetIpDriver implements PlcDriver {
         try {
             RawSocketIpAddress rawSocketAddress = new RawSocketIpAddress(
                 networkDevice, ALL_PROTOCOLS, null, BACNET_IP_PORT);
-            return new PassiveBacNetIpPlcConnection(rawSocketAddress, params);
+            return new PassiveBacNetIpPlcConnection(rawSocketAddress, params, new HelloWorldProtocol());
         } catch (Exception e) {
             throw new PlcConnectionException("Error connecting to host", e);
         }
diff --git a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/connection/PassiveBacNetIpPlcConnection.java b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/connection/PassiveBacNetIpPlcConnection.java
index c15bac4..50fdba8 100644
--- a/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/connection/PassiveBacNetIpPlcConnection.java
+++ b/sandbox/test-java-bacnetip-driver/src/main/java/org/apache/plc4x/java/bacnetip/connection/PassiveBacNetIpPlcConnection.java
@@ -44,13 +44,16 @@ public class PassiveBacNetIpPlcConnection extends NettyPlcConnection implements
 
     private static final Logger logger = LoggerFactory.getLogger(PassiveBacNetIpPlcConnection.class);
 
-    public PassiveBacNetIpPlcConnection(RawSocketIpAddress address, String params) {
+    private final ChannelHandler handler;
+
+    public PassiveBacNetIpPlcConnection(RawSocketIpAddress address, String params, ChannelHandler handler) {
         this(new RawSocketChannelFactory(address.getDeviceName(), null,
-            address.getPort(), RawSocketAddress.ALL_PROTOCOLS, new UdpIpPacketHandler()), params);
+            address.getPort(), RawSocketAddress.ALL_PROTOCOLS, new UdpIpPacketHandler()), params, handler);
     }
 
-    public PassiveBacNetIpPlcConnection(ChannelFactory channelFactory, String params) {
+    public PassiveBacNetIpPlcConnection(ChannelFactory channelFactory, String params, ChannelHandler handler) {
         super(channelFactory, true);
+        this.handler = handler;
     }
 
     @Override
@@ -82,7 +85,7 @@ public class PassiveBacNetIpPlcConnection extends NettyPlcConnection implements
                     }
                 });
                 pipeline.addLast(new BacNetIpProtocol());
-                pipeline.addLast(new HelloWorldProtocol());
+                pipeline.addLast(handler);
             }
         };
     }
diff --git a/sandbox/test-java-bacnetip-driver/src/test/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriverManual.java b/sandbox/test-java-bacnetip-driver/src/test/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriverManual.java
index 68f29a6..07852f1 100644
--- a/sandbox/test-java-bacnetip-driver/src/test/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriverManual.java
+++ b/sandbox/test-java-bacnetip-driver/src/test/java/org/apache/plc4x/java/bacnetip/PassiveBacNetIpDriverManual.java
@@ -19,6 +19,7 @@ under the License.
 package org.apache.plc4x.java.bacnetip;
 
 import org.apache.plc4x.java.bacnetip.connection.PassiveBacNetIpPlcConnection;
+import org.apache.plc4x.java.bacnetip.protocol.HelloWorldProtocol;
 import org.apache.plc4x.java.base.connection.NettyPlcConnection;
 import org.apache.plc4x.java.base.connection.PcapChannelFactory;
 import org.apache.plc4x.java.utils.pcapsockets.netty.PcapSocketAddress;
@@ -34,7 +35,7 @@ public class PassiveBacNetIpDriverManual {
             //new File("/Users/christofer.dutz/Projects/Apache/PLC4X-Documents/BacNET/Captures/Merck/BACnetWhoIsRouterToNetwork.pcapng"), null,
             new File("/Users/christofer.dutz/Downloads/20190906_udp.pcapng"), null,
             PassiveBacNetIpDriver.BACNET_IP_PORT, PcapSocketAddress.ALL_PROTOCOLS,
-            PcapSocketChannelConfig.NO_THROTTLING, new UdpIpPacketHandler()), "");
+            PcapSocketChannelConfig.NO_THROTTLING, new UdpIpPacketHandler()), "", new HelloWorldProtocol());
         connection.connect();
     }