You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by ot...@apache.org on 2022/08/11 00:35:23 UTC

[plc4x] branch develop updated: Apache Nifi plc4x record processor 2 (#439)

This is an automated email from the ASF dual-hosted git repository.

otto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 78da5b982 Apache Nifi plc4x record processor 2 (#439)
78da5b982 is described below

commit 78da5b9823c6e4678eea7853bd4ab191b597979a
Author: iñigo <an...@gmail.com>
AuthorDate: Thu Aug 11 02:35:18 2022 +0200

    Apache Nifi plc4x record processor 2 (#439)
    
    * new brannch update
    
    * correct README and comments
    
    * README _type
    
    * processor dynamic properties for Plc var address, PlcAddressString parse method
    
    * README update
    
    * removed Plc Address String, leaving Dynamic properties approach
    
    * update readme
    
    remove querystring doc, now dynamic properties are used
    
    * Update README.md
    
    opcua example
    
    Co-authored-by: inigo <in...@persefone.zylk.net>
    Co-authored-by: Gustavo Fernández <gu...@zylk.net>
    Co-authored-by: AlvSel <Al...@users.noreply.github.com>
---
 plc4j/integrations/apache-nifi/README.md           | 101 ++++++++
 .../apache-nifi/nifi-plc4x-nar/bin/.gitignore      |   2 +
 .../apache-nifi/nifi-plc4x-nar/pom.xml             | 279 +++++++++++----------
 .../nifi-plc4x-processors/{ => bin}/pom.xml        |   0
 .../services/org.apache.nifi.processor.Processor   |   0
 .../bin/src/test/resources/logback-test.xml        |  36 +++
 .../apache-nifi/nifi-plc4x-processors/pom.xml      | 223 +++++++++++++---
 .../org/apache/plc4x/nifi/BasePlc4xProcessor.java  | 110 +++++---
 .../org/apache/plc4x/nifi/Plc4xSinkProcessor.java  |   4 +-
 .../apache/plc4x/nifi/Plc4xSourceProcessor.java    |  12 +-
 .../plc4x/nifi/Plc4xSourceRecordProcessor.java     | 218 ++++++++++++++++
 .../record/Plc4xFullReadResponseRowCallback.java   |  14 ++
 .../nifi/record/Plc4xReadResponseRecordSet.java    | 124 +++++++++
 .../nifi/record/Plc4xReadResponseRowCallback.java  |   9 +
 .../org/apache/plc4x/nifi/record/Plc4xWriter.java  |  57 +++++
 .../plc4x/nifi/record/RecordPlc4xWriter.java       | 147 +++++++++++
 .../org/apache/plc4x/nifi/util/Plc4xCommon.java    | 239 ++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   3 +-
 .../Plc4xSinkProcessorTest.java                    |   2 +-
 .../Plc4xSourceProcessorTest.java                  |   2 +-
 .../plc4x/nifi/Plc4xSourceRecordProcessorTest.java |  88 +++++++
 plc4j/integrations/apache-nifi/pom.xml             | 195 +++++++-------
 plc4j/pom.xml                                      |   1 +
 23 files changed, 1555 insertions(+), 311 deletions(-)

diff --git a/plc4j/integrations/apache-nifi/README.md b/plc4j/integrations/apache-nifi/README.md
new file mode 100644
index 000000000..1dbe3d65b
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/README.md
@@ -0,0 +1,101 @@
+# PLC4X Apache NiFi Integration
+
+## Plc4xSinkProcessor
+
+## Plc4xSourceProcessor
+
+## Plc4xSourceRecordProcessor
+
+This processor is <ins>record oriented</ins>, formatting output flowfile content using a Record Writer (for further information see [NiFi Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)). 
+
+The Plc4xSourceRecord Processor can be configured using the following **properties**:
+
+- *PLC connection String:* PLC4X connection string used to connect to a given PLC device.
+- *Record Writer:* Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.
+- *Read timeout (miliseconds):* Specifies the time in milliseconds for the connection to return a timeout
+
+Then, the PLC variables to be accessed are specificied using Nifi processor **Dynamic Properties**. For each variable, add a new property to the processor where the property name matches the variable name, and the variable value corresponds to the address field. 
+
+An *example* of these properties for reading values from a S7-1200:
+
+- *PLC connection String:* *s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200*
+- *Record Writer:* *PLC4x Embedded - AvroRecordSetWriter*
+- *Read timeout (miliseconds):* *10000*
+- *var1:* *%DB1:DBX0.0:BOOL*
+- *var2:* *%DB1:DBX0.1:BOOL*
+- *var3:* *%DB1:DBB01:BYTE*
+- *var4:* *%DB1:DBW02:WORD*
+- *var5:* *%DB1:DBW04:INT*
+
+Another *example* of these properties for reading values using OPCUA:
+- *PLC connection String:* *opcua:tcp://10.105.143.6:4840?discovery=false*
+- *Record Writer:* *PLC4x Embedded - AvroRecordSetWriter*
+- *Read timeout (miliseconds):* *10000*
+- *AcyclicReceiveBit00:* *ns=2;i=11*
+- *MaxCurrentI_max:* *ns=2;i=33*
+
+For the **Record Writer** property, any writer included in NiFi could be used, such as JSON, CSV, etc (also custom writers can be created). In this example, an Avro Writer is supplied, configured as follows:
+
+- *Schema Write Strategy:* Embed Avro Schema
+- *Schema Cache:* No value set
+- *Schema Protocol Version:* 1
+- *Schema Access Strategy:* Inherit Record Schema
+- *Schema Registry:* No value set
+- *Schema Name:* ${schema.name}
+- *Schema Version:* No value set
+- *Schema Branch:* No value set
+- *Schema Text:* ${avro.schema}
+- *Compression Format:* NONE
+- *Cache Size:* 1000
+- *Encoder Pool Size:* 32
+
+
+The output flowfile will contain the PLC read values. This information is included in the flowfile content, following the Record Oriented presentation using a **schema** and the configuration specified in the Record Writer (format, schema inclusion, etc). In the schema, one field will be included for each of the variables defined taking into account the specified datatype. Also, a *ts* (timestamp) field is additionally included containing the read date. An example of the content of a flo [...]
+
+```
+[ {
+  "var1" : true,
+  "var2" : false,
+  "var3" : [ false, false, false, false, false, false, true, true ],
+  "var5" : 1992,
+  "var4" : [ false, false, false, false, false, false, false, false, false, false, false, true, false, false, false, true ],
+  "ts" : 1628783058433
+} ]
+```
+
+Also, it is important to keep in mind the Processor Scheduling Configuration. Using the parameter **Run Schedule** (for example to *1 sec*), the reading frequency can be set. Note that by default, this value is defined to 0 sec (as fast as possible).
+
+Table of data mapping between plc data and Avro
+
+| PLC type | Avro Type |
+|----------|-----------|
+| PlcBigDecimal | float |
+| PlcBigInteger | long |
+| PlcBitString | string |
+| PlcBOOL | boolean |
+| PlcBYTE | string |
+| PlcCHAR | string |
+| PlcDATE_AND_TIME | string |
+| PlcDATE | string |
+| PlcDINT | string |
+| PlcDWORD | string |
+| PlcINT | int |
+| PlcLINT | string |
+| PlcList | string |
+| PlcLREAL | string |
+| PlcLTIME | string |
+| PlcLWORD | string |
+| PlcNull | string |
+| PlcREAL | double |
+| PlcSINT | int |
+| PlcSTRING | string |
+| PlcStruct | string |
+| PlcTIME_OF_DAY | string |
+| PlcTIME | string |
+| PlcUDINT | string |
+| PlcUINT | string |
+| PlcULINT | string |
+| PlcUSINT | string |
+| PlcWCHAR | string |
+| PlcWORD | string |
+| ELSE | string |
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-nar/bin/.gitignore b/plc4j/integrations/apache-nifi/nifi-plc4x-nar/bin/.gitignore
new file mode 100644
index 000000000..26fd4d712
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-nar/bin/.gitignore
@@ -0,0 +1,2 @@
+/false-positives.xml
+/pom.xml
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-nar/pom.xml b/plc4j/integrations/apache-nifi/nifi-plc4x-nar/pom.xml
index 7c4ab210e..0ac86b45e 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-nar/pom.xml
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-nar/pom.xml
@@ -18,146 +18,155 @@
   under the License.
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+	<modelVersion>4.0.0</modelVersion>
 
-  <parent>
-    <groupId>org.apache.plc4x</groupId>
-    <artifactId>plc4j-apache-nifi</artifactId>
-    <version>0.10.0-SNAPSHOT</version>
-  </parent>
+	<parent>
+		<groupId>org.apache.plc4x</groupId>
+		<artifactId>plc4j-apache-nifi</artifactId>
+		<version>0.10.0-SNAPSHOT</version>
+	</parent>
 
-  <artifactId>plc4j-nifi-plc4x-nar</artifactId>
-  <packaging>nar</packaging>
+	<artifactId>plc4j-nifi-plc4x-nar</artifactId>
+	<packaging>nar</packaging>
 
-  <name>PLC4J: Integrations: Apache Nifi: NAR</name>
+	<name>PLC4J: Integrations: Apache Nifi: NAR</name>
 
-  <properties>
-    <maven.javadoc.skip>true</maven.javadoc.skip>
-    <source.skip>true</source.skip>
-  </properties>
+	<properties>
+		<maven.javadoc.skip>true</maven.javadoc.skip>
+		<source.skip>true</source.skip>
+	</properties>
 
-  <build>
-    <pluginManagement>
-      <plugins>
-        <plugin>
-          <groupId>org.owasp</groupId>
-          <artifactId>dependency-check-maven</artifactId>
-          <configuration>
-            <suppressionFiles>${project.basedir}/false-positives.xml</suppressionFiles>
-          </configuration>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-dependency-plugin</artifactId>
-          <configuration>
-            <usedDependencies>
-              <usedDependency>org.apache.plc4x:plc4j-nifi-plc4x-processors</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-api</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-ab-eth</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-ads</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-canopen</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-eip</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-knxnetip</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-modbus</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-opcua</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-s7</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-simulated</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-pcap-replay</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-raw-socket</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-serial</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-tcp</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-udp</usedDependency>
-            </usedDependencies>
-          </configuration>
-        </plugin>
-      </plugins>
-    </pluginManagement>
-  </build>
+	<build>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.owasp</groupId>
+					<artifactId>dependency-check-maven</artifactId>
+					<configuration>
+						<suppressionFiles>${project.basedir}/false-positives.xml</suppressionFiles>
+					</configuration>
+				</plugin>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-dependency-plugin</artifactId>
+					<configuration>
+						<usedDependencies>
+							<usedDependency>org.apache.plc4x:plc4j-nifi-plc4x-processors</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-api</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-ab-eth</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-ads</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-canopen</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-eip</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-knxnetip</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-modbus</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-opcua</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-s7</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-simulated</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-pcap-replay</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-raw-socket</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-serial</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-tcp</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-udp</usedDependency>
+						</usedDependencies>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
 
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-nifi-plc4x-processors</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <!-- PLC4X -->
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-api</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <!-- Bundle Drivers -->
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-ab-eth</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-ads</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-canopen</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-eip</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-knxnetip</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-modbus</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-opcua</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-s7</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-simulated</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-nifi-plc4x-processors</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
 
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-pcap-replay</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-raw-socket</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-serial</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-tcp</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-udp</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-  </dependencies>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-standard-services-api-nar</artifactId>
+			<version>${nifi.version}</version>
+			<type>nar</type>
+			<scope>provided</scope>
+		</dependency>
 
-</project>
+		<!-- PLC4X -->
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-api</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<!-- Bundle Drivers -->
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-ab-eth</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-ads</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-canopen</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-eip</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-knxnetip</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-modbus</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-opcua</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-s7</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-simulated</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-pcap-replay</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-raw-socket</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-serial</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-tcp</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-udp</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+	</dependencies>
+
+</project>
\ No newline at end of file
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/bin/pom.xml
similarity index 100%
copy from plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
copy to plc4j/integrations/apache-nifi/nifi-plc4x-processors/bin/pom.xml
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/bin/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
similarity index 100%
copy from plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
copy to plc4j/integrations/apache-nifi/nifi-plc4x-processors/bin/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/bin/src/test/resources/logback-test.xml b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/bin/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..2b9cea25d
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/bin/src/test/resources/logback-test.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<configuration xmlns="http://ch.qos.logback/xml/ns/logback"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="
+                  http://ch.qos.logback/xml/ns/logback
+                  https://raw.githubusercontent.com/enricopulatzo/logback-XSD/master/src/main/xsd/logback.xsd">
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="error">
+    <appender-ref ref="STDOUT" />
+  </root>
+
+</configuration>
\ No newline at end of file
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
index c93c68e6e..327fcf851 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/pom.xml
@@ -18,40 +18,189 @@
   under the License.
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.plc4x</groupId>
-    <artifactId>plc4j-apache-nifi</artifactId>
-    <version>0.10.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>plc4j-nifi-plc4x-processors</artifactId>
-
-  <name>PLC4J: Integrations: Apache Nifi: Processors</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-api</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-connection-pool</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.nifi</groupId>
-      <artifactId>nifi-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.nifi</groupId>
-      <artifactId>nifi-mock</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-</project>
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.plc4x</groupId>
+		<artifactId>plc4j-apache-nifi</artifactId>
+		<version>0.10.0-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>plc4j-nifi-plc4x-processors</artifactId>
+
+	<name>PLC4J: Integrations: Apache Nifi: Processors</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-api</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-connection-pool</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-spi</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-api</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-record</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-record-serialization-service-api</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-utils</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-avro-record-utils</artifactId>
+			<version>${nifi.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.checkerframework</groupId>
+					<artifactId>checker-qual</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.google.errorprone</groupId>
+					<artifactId>error_prone_annotations</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-record-serialization-services</artifactId>
+			<version>${nifi.version}</version>
+			<scope>test</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>javax.validation</groupId>
+					<artifactId>validation-api</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-schema-registry-service-api</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-mock</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-mock-record-utils</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- This one is for test purpouses only, drivers are included in the nar -->
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-modbus</artifactId>
+			<version>${nifi-plc4x.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-opcua</artifactId>
+			<version>${nifi-plc4x.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-s7</artifactId>
+			<version>${nifi-plc4x.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-simulated</artifactId>
+			<version>${nifi-plc4x.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-pcap-replay</artifactId>
+			<version>${nifi-plc4x.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-raw-socket</artifactId>
+			<version>${nifi-plc4x.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-serial</artifactId>
+			<version>${nifi-plc4x.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-tcp</artifactId>
+			<version>${nifi-plc4x.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-transport-udp</artifactId>
+			<version>${nifi-plc4x.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- mvn -Drat.skip=true verify -X -->
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<version>${nifi-plc4x-avro.version}</version>
+			<scope>compile</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>org.osgi</groupId>
+					<artifactId>org.osgi.core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.commons</groupId>
+					<artifactId>commons-compress</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-standard-nar</artifactId>
+			<version>${nifi.version}</version>
+			<type>nar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-record</artifactId>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-record-serialization-service-api</artifactId>
+			<scope>compile</scope>
+		</dependency>
+
+
+	</dependencies>
+</project>
\ No newline at end of file
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index f8d2bbcfd..2cd4ccda0 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -18,59 +18,77 @@
  */
 package org.apache.plc4x.nifi;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.*;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 
-import java.util.*;
-
 public abstract class BasePlc4xProcessor extends AbstractProcessor {
 
-    private static final PropertyDescriptor PLC_CONNECTION_STRING = new PropertyDescriptor
+	protected static final PropertyDescriptor PLC_CONNECTION_STRING = new PropertyDescriptor
         .Builder().name("PLC_CONNECTION_STRING")
         .displayName("PLC connection String")
         .description("PLC4X connection string used to connect to a given PLC device.")
         .required(true)
         .addValidator(new Plc4xConnectionStringValidator())
         .build();
-    private static final PropertyDescriptor PLC_ADDRESS_STRING = new PropertyDescriptor
-        .Builder().name("PLC_ADDRESS_STRING")
-        .displayName("PLC resource address String")
-        .description("PLC4X address string used identify the resource to read/write on a given PLC device " +
-            "(Multiple values supported). The expected format is: {name}={address}(;{name}={address})*")
-        .required(true)
-        .addValidator(new Plc4xAddressStringValidator())
-        .build();
-
-    static final Relationship SUCCESS = new Relationship.Builder()
-        .name("SUCCESS")
-        .description("Successfully processed")
-        .build();
-    static final Relationship FAILURE = new Relationship.Builder()
-        .name("FAILURE")
+	
+    protected static final Relationship REL_SUCCESS = new Relationship.Builder()
+	    .name("success")
+	    .description("Successfully processed")
+	    .build();
+    
+    protected static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
         .description("An error occurred processing")
         .build();
 
-    private List<PropertyDescriptor> descriptors;
 
-    Set<Relationship> relationships;
+    protected List<PropertyDescriptor> properties;
+    protected Set<Relationship> relationships;
+  
+    protected String connectionString;
+    protected Map<String, String> addressMap;
 
-    private String connectionString;
-    private Map<String, String> addressMap;
 
     private final PooledPlcDriverManager driverManager = new PooledPlcDriverManager();
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
-        this.descriptors = Arrays.asList(PLC_CONNECTION_STRING, PLC_ADDRESS_STRING);
-        this.relationships = new HashSet<>(Arrays.asList(SUCCESS, FAILURE));
+    	final List<PropertyDescriptor> properties = new ArrayList<>();
+    	properties.add(PLC_CONNECTION_STRING);
+        this.properties = Collections.unmodifiableList(properties);
+
+    	
+    	final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
     }
 
+    public Map<String, String> getPlcAddress() {
+        return addressMap;
+    }
+    
     public String getConnectionString() {
         return connectionString;
     }
@@ -81,30 +99,40 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
     String getAddress(String field) {
         return addressMap.get(field);
     }
-
-    @Override
+    
+	@Override
     public Set<Relationship> getRelationships() {
         return this.relationships;
     }
 
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
+        return properties;
+    }
+    
+    //dynamic prop
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .required(false)
+                .dynamic(true)
+                .build();
     }
 
+
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        PropertyValue property = context.getProperty(PLC_CONNECTION_STRING.getName());
-        connectionString = property.getValue();
-        addressMap = new HashMap<>();
-        PropertyValue addresses = context.getProperty(PLC_ADDRESS_STRING.getName());
-        for (String segment : addresses.getValue().split(";")) {
-            String[] parts = segment.split("=");
-            if(parts.length != 2) {
-                throw new PlcRuntimeException("Invalid address format");
-            }
-            addressMap.put(parts[0], parts[1]);
-        }
+		connectionString = context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
+		addressMap = new HashMap<>();
+		//variables are passed as dynamic properties
+		context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).forEach(
+				t -> addressMap.put(t.getName(), context.getProperty(t.getName()).getValue()));
+		if (addressMap.isEmpty()) {
+			throw new PlcRuntimeException("No address specified");
+		}	
     }
 
     @Override
@@ -119,7 +147,7 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
             return false;
         }
         BasePlc4xProcessor that = (BasePlc4xProcessor) o;
-        return Objects.equals(descriptors, that.descriptors) &&
+        return Objects.equals(properties, that.properties) &&
             Objects.equals(getRelationships(), that.getRelationships()) &&
             Objects.equals(getConnectionString(), that.getConnectionString()) &&
             Objects.equals(addressMap, that.addressMap);
@@ -127,7 +155,7 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), descriptors, getRelationships(), getConnectionString(), addressMap);
+        return Objects.hash(super.hashCode(), properties, getRelationships(), getConnectionString(), addressMap);
     }
 
     public static class Plc4xConnectionStringValidator implements Validator {
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index e543bad52..e6b68bb5f 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -69,10 +69,10 @@ public class Plc4xSinkProcessor extends BasePlc4xProcessor {
             try {
                 final PlcWriteResponse plcWriteResponse = writeRequest.execute().get();
                 // TODO: Evaluate the response and create flow files for successful and unsuccessful updates
-                session.transfer(flowFile, SUCCESS);
+                session.transfer(flowFile, REL_SUCCESS);
             } catch (Exception e) {
                 flowFile = session.putAttribute(flowFile, "exception", e.getLocalizedMessage());
-                session.transfer(flowFile, FAILURE);
+                session.transfer(flowFile, REL_FAILURE);
             }
         } catch (ProcessException e) {
             throw e;
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index dc6634a59..e41b439af 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -18,6 +18,10 @@
  */
 package org.apache.plc4x.nifi;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -31,10 +35,6 @@ import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcReadRequest;
 import org.apache.plc4x.java.api.messages.PlcReadResponse;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
 @Tags({"plc4x-source"})
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Processor able to read data from industrial PLCs using Apache PLC4X")
@@ -69,14 +69,14 @@ public class Plc4xSourceProcessor extends BasePlc4xProcessor {
                         attributes.put(fieldName, String.valueOf(value));
                     }
                 }
-                flowFile = session.putAllAttributes(flowFile, attributes);
+                flowFile = session.putAllAttributes(flowFile, attributes);   
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new ProcessException(e);
             } catch (ExecutionException e) {
                 throw new ProcessException(e);
             }
-            session.transfer(flowFile, SUCCESS);
+            session.transfer(flowFile, REL_SUCCESS);
         } catch (ProcessException e) {
             throw e;
         } catch (Exception e) {
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
new file mode 100644
index 000000000..9b455df22
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -0,0 +1,218 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.nifi;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StopWatch;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.nifi.record.Plc4xWriter;
+import org.apache.plc4x.nifi.record.RecordPlc4xWriter;
+
+@Tags({ "plc4x-source" })
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@CapabilityDescription("Processor able to read data from industrial PLCs using Apache PLC4X")
+@WritesAttributes({ @WritesAttribute(attribute = "value", description = "some value") })
+public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor {
+
+	public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
+	public static final String RESULT_QUERY_DURATION = "plc4x.read.query.duration";
+	public static final String RESULT_QUERY_EXECUTION_TIME = "plc4x.read.query.executiontime";
+	public static final String RESULT_QUERY_FETCH_TIME = "plc4x.read.query.fetchtime";
+	public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+	public static final String RESULT_ERROR_MESSAGE = "plc4x.read.error.message";
+
+	public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("plc4x-record-writer").displayName("Record Writer")
+		.description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
+				+ "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
+		.identifiesControllerService(RecordSetWriterFactory.class)
+		.required(true)
+		.build();
+	
+	public static final PropertyDescriptor PLC_READ_FUTURE_TIMEOUT_MILISECONDS = new PropertyDescriptor.Builder().name("plc4x-record-read-timeout").displayName("Read timeout (miliseconds)")
+		.description("Read timeout in miliseconds")
+		.defaultValue("10000")
+		.required(true)
+		.addValidator(StandardValidators.INTEGER_VALIDATOR)
+		.build();
+
+	Integer readTimeout;
+	public Plc4xSourceRecordProcessor() {
+	}
+
+	@Override
+	protected void init(final ProcessorInitializationContext context) {
+		super.init(context);
+		final Set<Relationship> r = new HashSet<>();
+		r.addAll(super.getRelationships());
+		this.relationships = Collections.unmodifiableSet(r);
+
+		final List<PropertyDescriptor> pds = new ArrayList<>();
+		pds.addAll(super.getSupportedPropertyDescriptors());
+		pds.add(PLC_RECORD_WRITER_FACTORY);
+		pds.add(PLC_READ_FUTURE_TIMEOUT_MILISECONDS);
+		this.properties = Collections.unmodifiableList(pds);
+	}
+
+	@OnScheduled
+	@Override
+	public void onScheduled(final ProcessContext context) {
+        super.connectionString = context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
+        this.readTimeout = context.getProperty(PLC_READ_FUTURE_TIMEOUT_MILISECONDS.getName()).asInteger();
+		addressMap = new HashMap<>();
+		//variables are passed as dynamic properties
+		context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic).forEach(
+				t -> addressMap.put(t.getName(), context.getProperty(t.getName()).getValue()));
+		if (addressMap.isEmpty()) {
+			throw new PlcRuntimeException("No address specified");
+		}	
+	}
+	
+	@Override
+	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+		FlowFile fileToProcess = null;
+		// TODO: In the future the processor will be configurable to get the address and the connection from incoming flowfile
+		if (context.hasIncomingConnection()) {
+			fileToProcess = session.get();
+			// If we have no FlowFile, and all incoming connections are self-loops then we
+			// can continue on.
+			// However, if we have no FlowFile and we have connections coming from other
+			// Processors, then we know that we should run only if we have a FlowFile.
+			if (fileToProcess == null && context.hasNonLoopConnection()) {
+				return;
+			}
+		}
+		
+		Plc4xWriter plc4xWriter = new RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class), fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
+		final ComponentLog logger = getLogger();
+		// Get an instance of a component able to read from a PLC.
+		// TODO: Change this to use NiFi service instead of direct connection
+		final AtomicLong nrOfRows = new AtomicLong(0L);
+		final StopWatch executeTime = new StopWatch(true);
+
+		try (PlcConnection connection = getDriverManager().getConnection(getConnectionString())) {
+
+			String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
+			Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
+			FlowFile resultSetFF;
+			if (fileToProcess == null) {
+				resultSetFF = session.create();
+			} else {
+				resultSetFF = session.create(fileToProcess);
+			}
+			if (inputFileAttrMap != null) {
+				resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap);
+			}
+
+			PlcReadRequest.Builder builder = connection.readRequestBuilder();
+			getFields().forEach(field -> {
+				String address = getAddress(field);
+				if (address != null) {
+					builder.addItem(field, address);
+				}
+			});
+			PlcReadRequest readRequest = builder.build();
+			final FlowFile originalFlowFile = fileToProcess;
+			resultSetFF = session.write(resultSetFF, out -> {
+				try {
+					PlcReadResponse readResponse = readRequest.execute().get(this.readTimeout, TimeUnit.MILLISECONDS);
+					
+					if(originalFlowFile == null) //there is no inherit attributes to use in writer service 
+						nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null));
+					else 
+						nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, originalFlowFile));
+				} catch (InterruptedException e) {
+					logger.error("InterruptedException reading the data from PLC", e);
+		            Thread.currentThread().interrupt();
+		            throw new ProcessException(e);
+				} catch (TimeoutException e) {
+					logger.error("Timeout reading the data from PLC", e);
+					throw new ProcessException(e);
+				} catch (Exception e) {
+					logger.error("Exception reading the data from PLC", e);
+					throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
+				}
+			});
+			long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
+			final Map<String, String> attributesToAdd = new HashMap<>();
+			attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+			attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+			if (inputFileUUID != null) {
+				attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
+			}
+			attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
+			resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
+			plc4xWriter.updateCounters(session);
+			logger.info("{} contains {} records; transferring to 'success'", new Object[] { resultSetFF, nrOfRows.get() });
+			// Report a FETCH event if there was an incoming flow file, or a RECEIVE event
+			// otherwise
+			if (context.hasIncomingConnection()) {
+				session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
+			} else {
+				session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
+			}
+			
+			session.transfer(resultSetFF, BasePlc4xProcessor.REL_SUCCESS);
+			// Need to remove the original input file if it exists
+			if (fileToProcess != null) {
+				session.remove(fileToProcess);
+				fileToProcess = null;
+			}
+			session.commitAsync();
+			
+		} catch (PlcConnectionException e) {
+			logger.error("Error getting the PLC connection", e);
+			throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e);
+		} catch (Exception e) {
+			logger.error("Got an error while trying to get a connection", e);
+			throw new ProcessException("Got an error while trying to get a connection", e);
+		}
+	}
+
+}
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xFullReadResponseRowCallback.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xFullReadResponseRowCallback.java
new file mode 100644
index 000000000..4d8d80a08
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xFullReadResponseRowCallback.java
@@ -0,0 +1,14 @@
+package org.apache.plc4x.nifi.record;
+
+import java.io.IOException;
+
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+
+public class Plc4xFullReadResponseRowCallback implements Plc4xReadResponseRowCallback {
+
+	@Override
+	public void processRow(PlcReadResponse result) throws IOException {
+		// do nothing
+	}
+
+}
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
new file mode 100644
index 000000000..d22c60450
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -0,0 +1,124 @@
+package org.apache.plc4x.nifi.record;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.nifi.util.Plc4xCommon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
+    private static final Logger logger = LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class);
+    private final PlcReadResponse readResponse;
+    private final Set<String> rsColumnNames;
+    private boolean moreRows;
+
+    // TODO: review this AtomicReference?
+	// TODO: this could be enhanced checking if record schema should be updated (via a cache boolean, checking property values is a nifi expression language, etc)
+  	private AtomicReference<RecordSchema> recordSchema;
+
+    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse) throws IOException {
+        this.readResponse = readResponse;
+        moreRows = true;
+        
+        logger.debug("Creating record schema from PlcReadResponse");
+        Map<String, ? extends PlcValue> responseDataStructure = readResponse.getAsPlcValue().getStruct();
+        rsColumnNames = responseDataStructure.keySet();
+               
+        if (recordSchema == null) {
+        	Schema avroSchema = Plc4xCommon.createSchema(responseDataStructure); //TODO: review this method as it is the 'mapping' from PlcValues to avro datatypes        	
+        	recordSchema = new AtomicReference<RecordSchema>();
+        	recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
+        }
+        logger.debug("Record schema from PlcReadResponse successfuly created.");
+
+    }
+
+    
+    @Override
+    public RecordSchema getSchema() {
+        return this.recordSchema.get();
+    }
+
+    // Protected methods for subclasses to access private member variables
+    protected PlcReadResponse getReadResponse() {
+        return readResponse;
+    }
+
+    protected boolean hasMoreRows() {
+        return moreRows;
+    }
+
+    protected void setMoreRows(boolean moreRows) {
+        this.moreRows = moreRows;
+    }
+
+    @Override
+    public Record next() throws IOException {
+        if (moreRows) {
+             final Record record = createRecord(readResponse);
+             setMoreRows(false);
+             return record;
+        } else {
+             return null;
+        }
+    }
+
+    @Override
+    public void close() {
+        //do nothing
+    }
+
+    protected Record createRecord(final PlcReadResponse readResponse) throws IOException{
+        final Map<String, Object> values = new HashMap<>(getSchema().getFieldCount());
+
+        logger.debug("creating record.");
+
+        for (final RecordField field : getSchema().getFields()) {
+            final String fieldName = field.getFieldName();
+
+            final Object value;
+            
+            if (rsColumnNames.contains(fieldName)) {
+            	value = normalizeValue(readResponse.getAsPlcValue().getValue(fieldName));
+            } else {
+                value = null;
+            }
+            
+            logger.trace(String.format("Adding %s field value to record.", fieldName));
+            values.put(fieldName, value);
+        }
+
+        //add timestamp field to schema
+        values.put(Plc4xCommon.PLC4X_RECORD_TIMESTAMP_FIELD_NAME, System.currentTimeMillis());
+        logger.debug("added timestamp field to record.");
+
+        	
+        return new MapRecord(getSchema(), values);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private Object normalizeValue(final PlcValue value) {
+        Object r = Plc4xCommon.normalizeValue(value);
+        logger.trace("Value data type: "+r.getClass());
+        return r;
+        
+    }
+
+
+}
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRowCallback.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRowCallback.java
new file mode 100644
index 000000000..dacfadaff
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRowCallback.java
@@ -0,0 +1,9 @@
+package org.apache.plc4x.nifi.record;
+
+import java.io.IOException;
+
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+
+public interface Plc4xReadResponseRowCallback {
+	public void processRow(PlcReadResponse result) throws IOException;
+}
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
new file mode 100644
index 000000000..82b9ccfef
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
@@ -0,0 +1,57 @@
+package org.apache.plc4x.nifi.record;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+
+public interface Plc4xWriter {
+	/**
+     * Writes the given result set out to the given output stream, possibly applying a callback as each row is processed.
+     * @param resultSet the ResultSet to be written
+     * @param outputStream the OutputStream to write the result set to
+     * @param logger a common logger that can be used to log messages during write
+     * @param callback a MaxValueResultSetRowCollector that may be called as each row in the ResultSet is processed
+     * @return the number of rows written to the output stream
+     * @throws Exception if any errors occur during the writing of the result set to the output stream
+     */
+    long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback) throws Exception;
+    long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, FlowFile originalFlowFile) throws Exception;
+
+    /**
+     * Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map.
+     * @return a map of attribute key/value pairs
+     */
+    default Map<String, String> getAttributesToAdd() {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Updates any session counters as a result of processing result sets. The default implementation is empty, no counters will be updated.
+     * @param session the session upon which to update counters
+     */
+    default void updateCounters(ProcessSession session) {
+    }
+
+    /**
+     * Writes an empty result set to the output stream. In some cases a ResultSet might not have any viable rows, but will throw an error or
+     * behave unexpectedly if rows are attempted to be retrieved. This method indicates the implementation should write whatever output is
+     * appropriate for a result set with no rows.
+     * @param outputStream the OutputStream to write the empty result set to
+     * @param logger a common logger that can be used to log messages during write
+     * @throws IOException if any errors occur during the writing of an empty result set to the output stream
+     */
+    void writeEmptyPlcReadResponse(OutputStream outputStream, ComponentLog logger) throws IOException;
+    void writeEmptyPlcReadResponse(OutputStream outputStream, ComponentLog logger, FlowFile originalFlowFile) throws IOException;
+
+    /**
+     * Returns the MIME type of the output format. This can be used in FlowFile attributes or to perform format-specific processing as necessary.
+     * @return the MIME type string of the output format.
+     */
+    String getMimeType();
+}
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
new file mode 100644
index 000000000..3edf234a7
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -0,0 +1,147 @@
+package org.apache.plc4x.nifi.record;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+
+public class RecordPlc4xWriter implements Plc4xWriter {
+
+	private final RecordSetWriterFactory recordSetWriterFactory;
+	private final AtomicReference<WriteResult> writeResultRef;
+	private final Map<String, String> originalAttributes;
+    private String mimeType;
+	
+	private RecordSet fullRecordSet;
+	private RecordSchema writeSchema;
+	
+	
+	public RecordPlc4xWriter(RecordSetWriterFactory recordSetWriterFactory, Map<String, String> originalAttributes) {
+		this.recordSetWriterFactory = recordSetWriterFactory;
+        this.writeResultRef = new AtomicReference<>();
+        this.originalAttributes = originalAttributes;
+	}
+
+	@Override
+	public long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback) throws Exception {
+		if (fullRecordSet == null) {
+            fullRecordSet = new Plc4xReadResponseRecordSetWithCallback(response, callback);
+            writeSchema = recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
+        }
+		Map<String, String> empty = new HashMap<>();
+        try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream, empty)) {
+            writeResultRef.set(resultSetWriter.write(fullRecordSet));
+            if (mimeType == null) {
+                mimeType = resultSetWriter.getMimeType();
+            }
+            return writeResultRef.get().getRecordCount();
+        } catch (final Exception e) {
+            throw new IOException(e);
+        }
+	}
+	
+	@Override
+	public long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback, FlowFile originalFlowFile) throws Exception {
+		if (fullRecordSet == null) {	
+            fullRecordSet = new Plc4xReadResponseRecordSetWithCallback(response, callback);
+            writeSchema = recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
+         }
+        try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream, originalFlowFile)) {
+            writeResultRef.set(resultSetWriter.write(fullRecordSet));
+            if (mimeType == null) {
+                mimeType = resultSetWriter.getMimeType();
+            }
+            return writeResultRef.get().getRecordCount();
+        } catch (final Exception e) {
+            throw new IOException(e);
+        }
+	}
+
+	
+	@Override
+	public void writeEmptyPlcReadResponse(OutputStream outputStream, ComponentLog logger) throws IOException {
+		Map<String, String> empty = new HashMap<>();
+		try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream, empty)) {
+            mimeType = resultSetWriter.getMimeType();
+            resultSetWriter.beginRecordSet();
+            resultSetWriter.finishRecordSet();
+        } catch (final Exception e) {
+            throw new IOException(e);
+        }
+	}
+	
+	@Override
+	public void writeEmptyPlcReadResponse(OutputStream outputStream, ComponentLog logger, FlowFile originalFlowFile) throws IOException {
+		try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(logger, writeSchema, outputStream, originalFlowFile)) {
+            mimeType = resultSetWriter.getMimeType();
+            resultSetWriter.beginRecordSet();
+            resultSetWriter.finishRecordSet();
+        } catch (final Exception e) {
+            throw new IOException(e);
+        }
+	}
+
+	@Override
+	public String getMimeType() {
+		return mimeType;
+	}
+	
+	@Override
+    public Map<String, String> getAttributesToAdd() {
+        Map<String, String> attributesToAdd = new HashMap<>();
+        attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeType);
+        // Add any attributes from the record writer (if present)
+        final WriteResult result = writeResultRef.get();
+        if (result != null) {
+            if (result.getAttributes() != null) {
+                attributesToAdd.putAll(result.getAttributes());
+            }
+            attributesToAdd.put("record.count", String.valueOf(result.getRecordCount()));
+        }
+        return attributesToAdd;
+    }
+
+	@Override
+    public void updateCounters(ProcessSession session) {
+        final WriteResult result = writeResultRef.get();
+        if (result != null) {
+            session.adjustCounter("Records Written", result.getRecordCount(), false);
+        }
+    }
+	
+	private static class Plc4xReadResponseRecordSetWithCallback extends Plc4xReadResponseRecordSet {
+        private final Plc4xReadResponseRowCallback callback;
+        public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse readResponse, Plc4xReadResponseRowCallback callback) throws IOException {
+            super(readResponse);
+            this.callback = callback;
+        }
+        @Override
+        public Record next() throws IOException {
+                if (hasMoreRows()) {
+                	PlcReadResponse response = getReadResponse();
+                    final Record record = createRecord(response);
+                    setMoreRows(false);
+                    if (callback != null) {
+                        callback.processRow(response);
+                    }
+                    return record;
+                } else {
+                    return null;
+                }
+        }
+	}
+
+}
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
new file mode 100644
index 000000000..0351f0a85
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
@@ -0,0 +1,239 @@
+package org.apache.plc4x.nifi.util;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaBuilder.FieldAssembler;
+//TODO review remaining datatypes
+import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.java.spi.values.PlcBOOL;
+import org.apache.plc4x.java.spi.values.PlcBYTE;
+import org.apache.plc4x.java.spi.values.PlcBigDecimal;
+import org.apache.plc4x.java.spi.values.PlcBigInteger;
+import org.apache.plc4x.java.spi.values.PlcBitString;
+import org.apache.plc4x.java.spi.values.PlcCHAR;
+import org.apache.plc4x.java.spi.values.PlcDATE;
+import org.apache.plc4x.java.spi.values.PlcDATE_AND_TIME;
+import org.apache.plc4x.java.spi.values.PlcDINT;
+import org.apache.plc4x.java.spi.values.PlcDWORD;
+import org.apache.plc4x.java.spi.values.PlcINT;
+import org.apache.plc4x.java.spi.values.PlcLINT;
+import org.apache.plc4x.java.spi.values.PlcLREAL;
+import org.apache.plc4x.java.spi.values.PlcLTIME;
+import org.apache.plc4x.java.spi.values.PlcLWORD;
+import org.apache.plc4x.java.spi.values.PlcList;
+import org.apache.plc4x.java.spi.values.PlcNull;
+import org.apache.plc4x.java.spi.values.PlcREAL;
+import org.apache.plc4x.java.spi.values.PlcSINT;
+import org.apache.plc4x.java.spi.values.PlcSTRING;
+import org.apache.plc4x.java.spi.values.PlcStruct;
+import org.apache.plc4x.java.spi.values.PlcTIME;
+import org.apache.plc4x.java.spi.values.PlcTIME_OF_DAY;
+import org.apache.plc4x.java.spi.values.PlcUDINT;
+import org.apache.plc4x.java.spi.values.PlcUINT;
+import org.apache.plc4x.java.spi.values.PlcULINT;
+import org.apache.plc4x.java.spi.values.PlcUSINT;
+import org.apache.plc4x.java.spi.values.PlcWCHAR;
+import org.apache.plc4x.java.spi.values.PlcWORD;
+
+
+
+public class Plc4xCommon {
+
+	
+	public static final String PLC4X_RECORD_TIMESTAMP_FIELD_NAME = "ts";
+	
+	/**
+	 * This method is used to infer output AVRO schema directly from the PlcReadResponse object. 
+	 * It is directly used from the RecordPlc4xWriter.writePlcReadResponse() method.
+	 * However, to make sure output schema does not change, it is built from the processor configuration (variable memory addresses).
+	 * 
+	 * At the moment this method does not handle the following Object Types: PlcValueAdapter, PlcIECValue<T>, PlcSimpleValue<T>
+	 * 
+	 * @param responseDataStructure: a map that reflects the structure of the answer given by the PLC when making a Read Request.
+	 * @return AVRO Schema built from responseDataStructure.
+	 */
+	public static Schema createSchema(Map<String, ? extends PlcValue> responseDataStructure){
+		//plc and record datatype map
+		final FieldAssembler<Schema> builder = SchemaBuilder.record("PlcReadResponse").namespace("any.data").fields();	
+		String fieldName = null;
+		
+		for (Map.Entry<String, ? extends PlcValue> entry : responseDataStructure.entrySet()) {
+			fieldName = entry.getKey();
+			if (entry.getValue() instanceof PlcBigDecimal) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault(); 				
+			}else if (entry.getValue() instanceof PlcBigInteger) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcBitString) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcBOOL) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcBYTE) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcCHAR) {	
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcDATE_AND_TIME) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();	
+			}else if (entry.getValue() instanceof PlcDATE) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcDINT) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcDWORD) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcINT) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();				
+			}else if (entry.getValue() instanceof PlcLINT) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcLREAL) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcLTIME) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcLWORD) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcNull) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcREAL) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();		
+			}else if (entry.getValue() instanceof PlcSINT) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcSTRING) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcStruct) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcTIME_OF_DAY) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcTIME) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcUDINT) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcUINT) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcULINT) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcUSINT) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcWCHAR) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
+			}else if (entry.getValue() instanceof PlcWORD) {
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();				
+			}else if(entry.getValue() instanceof PlcList) {
+				if(!entry.getValue().getList().isEmpty()) {
+					if(entry.getValue().getList().get(0) instanceof PlcBOOL) {
+						builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().array().items().booleanType().endUnion().noDefault();
+					}
+				} else {
+					builder.name(fieldName).type().nullBuilder().endNull();
+				}
+			}
+			else { //TODO try forcing any other datatype to string...
+				builder.name(fieldName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();	
+			}
+		}
+		
+		//add timestamp field to schema
+		builder.name(PLC4X_RECORD_TIMESTAMP_FIELD_NAME).type().longType().noDefault();
+		
+		
+		return builder.endRecord();
+
+	}
+	
+	
+	private static Object normalizeBasicTypes(final Object valueOriginal) {
+		if (valueOriginal == null) {
+			return null;
+		} else if (valueOriginal instanceof PlcValue) {
+			PlcValue value = (PlcValue) valueOriginal;
+			if (value.isBoolean() && value instanceof PlcBOOL)
+				return value.getBoolean();
+			else if (value.isBigInteger() && value instanceof PlcBigInteger)
+				return value.getBigInteger();
+			else if (value.isBigDecimal() && value instanceof PlcBigDecimal)
+				return value.getBigDecimal();
+			else if (value.isByte() && value instanceof PlcBYTE)
+				return value.getByte();
+			else if (value.isDate() && value instanceof PlcDATE)
+				return value.getDate();
+			else if (value.isDateTime() && value instanceof PlcDATE_AND_TIME)
+				return value.getDateTime();
+			else if (value.isFloat() && value instanceof PlcLREAL)
+				return value.getFloat();
+			else if (value.isInteger() && value instanceof PlcINT)
+				return value.getInteger();
+			else if (value.isList() && value instanceof PlcList) // TODO
+				return value.getList().toArray();
+			else if (value.isDouble())
+				return value.getDouble();
+			else if (value.isDuration())
+				return value.getDuration();
+			else if (value.isLong())
+				return value.getLong();
+			else if (value.isShort())
+				return value.getShort();
+			else if (value.isString())
+				return value.getString();
+			else if (value.isTime())
+				return value.getTime();
+			else
+				return value.getString();
+		} else {
+			return valueOriginal;
+		}
+    
+	}
+	
+	public static Object normalizeValue(final Object valueOriginal) {
+        if (valueOriginal == null) {
+            return null;
+        }
+        if (valueOriginal instanceof List) {
+            return ((List) valueOriginal).toArray();
+        } else  if (valueOriginal instanceof PlcValue) {
+        	PlcValue value = (PlcValue) valueOriginal;
+	        if(value.isBoolean() && value instanceof PlcBOOL)
+	        	return value.getBoolean();
+	        else if (value.isBigInteger() && value instanceof PlcBigInteger)
+	        	return value.getBigInteger();
+	        else if (value.isBigDecimal() && value instanceof PlcBigDecimal)
+	        	return value.getBigDecimal();
+	        else if (value.isByte() && value instanceof PlcBYTE)
+	        	return value.getByte();
+	        else if (value.isDate() && value instanceof PlcDATE)
+	        	return value.getDate();
+	        else if (value.isDateTime() && value instanceof PlcDATE_AND_TIME)
+	        	return value.getDateTime();
+	        else if (value.isFloat() && value instanceof PlcLREAL)
+	           	return value.getFloat();
+	        else if (value.isInteger() && value instanceof PlcINT)
+	           	return value.getInteger();
+	        else if (value.isList() && value instanceof PlcList) { //TODO
+	        	Object[] r = new Object[value.getList().size()];
+	        	int i = 0;
+	        	for (Object element : value.getList()) {
+	        		r[i] =  normalizeBasicTypes(element);
+	        		i++;
+				}
+	        	return r;
+	        }   	
+	        else if (value.isDouble())
+	        	return value.getDouble();
+	        else if (value.isDuration())
+	        	return value.getDuration();
+	        else if (value.isLong())
+	           	return value.getLong();
+	        else if (value.isShort())
+	           	return value.getShort();
+	        else if (value.isString())
+	          	return value.getString();
+	        else if (value.isTime())
+	          	return value.getTime();
+	        else 
+	        	return value.getString();
+        } else {
+        	return valueOriginal;
+        }
+    }
+	
+}
+
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a177318ca..8f55535ae 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.plc4x.nifi.Plc4xSinkProcessor
-org.apache.plc4x.nifi.Plc4xSourceProcessor
\ No newline at end of file
+org.apache.plc4x.nifi.Plc4xSourceProcessor
+org.apache.plc4x.nifi.Plc4xSourceRecordProcessor
\ No newline at end of file
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/processors/plc4x4nifi/Plc4xSinkProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
similarity index 96%
rename from plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/processors/plc4x4nifi/Plc4xSinkProcessorTest.java
rename to plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
index 1f00bea46..e9ce18eb2 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/processors/plc4x4nifi/Plc4xSinkProcessorTest.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.plc4x.processors.plc4x4nifi;
+package org.apache.plc4x.nifi;
 
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/processors/plc4x4nifi/Plc4xSourceProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
similarity index 96%
rename from plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/processors/plc4x4nifi/Plc4xSourceProcessorTest.java
rename to plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
index 71611dab0..711c366e2 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/processors/plc4x4nifi/Plc4xSourceProcessorTest.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.plc4x.processors.plc4x4nifi;
+package org.apache.plc4x.nifi;
 
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
new file mode 100644
index 000000000..4550406ac
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
@@ -0,0 +1,88 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.nifi;
+import org.apache.nifi.avro.AvroRecordSetWriter;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+
+public class Plc4xSourceRecordProcessorTest {
+	
+    private TestRunner testRunner;
+    private static int NUMBER_OF_CALLS = 5;
+    
+    @BeforeEach
+    public void init() throws InitializationException {
+    	        	
+    	testRunner = TestRunners.newTestRunner(Plc4xSourceRecordProcessor.class);
+    	testRunner.setIncomingConnection(false);
+    	testRunner.setValidateExpressionUsage(false);
+    	testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_READ_FUTURE_TIMEOUT_MILISECONDS, "100");
+    	testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_CONNECTION_STRING, "s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200");
+    	testRunner.setProperty("var1", "%DB1:DBX0.0:BOOL");
+    	testRunner.setProperty("var2", "%DB1:DBX0.1:BOOL");
+    	testRunner.setProperty("var3", "%DB1:DBB01:BYTE");
+    	testRunner.setProperty("var4", "%DB1:DBW02:WORD");
+    	testRunner.setProperty("var5", "%DB1:DBW04:INT");
+    	testRunner.addConnection(Plc4xSourceRecordProcessor.REL_SUCCESS);
+    	testRunner.addConnection(Plc4xSourceRecordProcessor.REL_FAILURE);
+    }
+
+    @Test
+    public void testMockRecordWriterProcessor() throws InitializationException {
+    	final MockRecordWriter writerService = new MockRecordWriter("header", false);
+    	testRunner.addControllerService("writer", writerService);
+    	testRunner.enableControllerService(writerService);
+    	testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_RECORD_WRITER_FACTORY.getName(), "writer");
+    	testRunner.run(NUMBER_OF_CALLS,true, true);
+    	//validations
+    	testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_FAILURE, 0);
+    	testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_SUCCESS, NUMBER_OF_CALLS);
+    }
+    
+    @Test
+    public void testJsonRecordWriterProcessor() throws InitializationException {
+    	final JsonRecordSetWriter writerService = new  JsonRecordSetWriter();
+    	testRunner.addControllerService("writer", writerService);
+    	testRunner.enableControllerService(writerService);
+    	testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_RECORD_WRITER_FACTORY.getName(), "writer");
+    	testRunner.run(NUMBER_OF_CALLS,true, true);
+    	//validations
+    	testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_FAILURE, 0);
+    	testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_SUCCESS, NUMBER_OF_CALLS);
+    }
+    
+    @Test
+    public void testAvroRecordWriterProcessor() throws InitializationException {
+    	final AvroRecordSetWriter writerService = new  AvroRecordSetWriter();
+    	testRunner.addControllerService("writer", writerService);
+    	testRunner.enableControllerService(writerService);
+    	testRunner.setProperty(Plc4xSourceRecordProcessor.PLC_RECORD_WRITER_FACTORY.getName(), "writer");
+    	testRunner.run(NUMBER_OF_CALLS,true, true);
+    	//validations
+    	testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_FAILURE, 0);
+    	testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_SUCCESS, NUMBER_OF_CALLS);
+    }
+
+}
diff --git a/plc4j/integrations/apache-nifi/pom.xml b/plc4j/integrations/apache-nifi/pom.xml
index 6845a5366..3aaa23c97 100644
--- a/plc4j/integrations/apache-nifi/pom.xml
+++ b/plc4j/integrations/apache-nifi/pom.xml
@@ -18,98 +18,119 @@
   under the License.
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+	<modelVersion>4.0.0</modelVersion>
 
-  <parent>
-    <groupId>org.apache.plc4x</groupId>
-    <artifactId>plc4j-integrations</artifactId>
-    <version>0.10.0-SNAPSHOT</version>
-  </parent>
+	<parent>
+		<groupId>org.apache.plc4x</groupId>
+		<artifactId>plc4j-integrations</artifactId>
+		<version>0.10.0-SNAPSHOT</version>
+	</parent>
 
-  <artifactId>plc4j-apache-nifi</artifactId>
-  <packaging>pom</packaging>
+	<artifactId>plc4j-apache-nifi</artifactId>
+	<packaging>pom</packaging>
 
-  <name>PLC4J: Integrations: Apache Nifi</name>
-  <description>Integration module for integrating PLC4X into Apache Nifi.</description>
+	<name>PLC4J: Integrations: Apache Nifi</name>
+	<description>Integration module for integrating PLC4X into Apache Nifi.</description>
 
-  <properties>
-    <nifi.version>1.16.3</nifi.version>
-  </properties>
+	<properties>
+		<nifi.version>1.16.3</nifi.version>
+		<nifi-plc4x.version>0.10.0-SNAPSHOT</nifi-plc4x.version>
+		<nifi-plc4x-avro.version>1.8.1</nifi-plc4x-avro.version>
+	</properties>
 
-  <modules>
-    <module>nifi-plc4x-processors</module>
-    <module>nifi-plc4x-nar</module>
-  </modules>
+	<modules>
+		<module>nifi-plc4x-processors</module>
+		<module>nifi-plc4x-nar</module>
+	</modules>
 
-  <build>
-    <plugins>
-      <!-- Add the ability to create nar packages -->
-      <plugin>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-nar-maven-plugin</artifactId>
-        <version>1.3.3</version>
-        <extensions>true</extensions>
-      </plugin>
-      <!-- Makes buildRevision and buildBranch available to the NAR Plugin, so they can be populated in the MANIFEST -->
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>buildnumber-maven-plugin</artifactId>
-        <inherited>true</inherited>
-        <executions>
-          <execution>
-            <phase>validate</phase>
-            <goals>
-              <goal>create</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <doCheck>false</doCheck>
-          <doUpdate>false</doUpdate>
-          <shortRevisionLength>7</shortRevisionLength>
-          <getRevisionOnlyOnce>true</getRevisionOnlyOnce>
-          <revisionOnScmFailure />
-          <buildNumberPropertyName>buildRevision</buildNumberPropertyName>
-          <scmBranchPropertyName>buildBranch</scmBranchPropertyName>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
+	<build>
+		<plugins>
+			<!-- Add the ability to create nar packages -->
+			<plugin>
+				<groupId>org.apache.nifi</groupId>
+				<artifactId>nifi-nar-maven-plugin</artifactId>
+				<version>1.3.3</version>
+				<extensions>true</extensions>
+			</plugin>
+			<!-- Makes buildRevision and buildBranch available to the NAR Plugin, so they can be populated in the MANIFEST -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>buildnumber-maven-plugin</artifactId>
+				<inherited>true</inherited>
+				<executions>
+					<execution>
+						<phase>validate</phase>
+						<goals>
+							<goal>create</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<doCheck>false</doCheck>
+					<doUpdate>false</doUpdate>
+					<shortRevisionLength>7</shortRevisionLength>
+					<getRevisionOnlyOnce>true</getRevisionOnlyOnce>
+					<revisionOnScmFailure />
+					<buildNumberPropertyName>buildRevision</buildNumberPropertyName>
+					<scmBranchPropertyName>buildBranch</scmBranchPropertyName>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
 
-  <dependencyManagement>
-    <dependencies>
-      <dependency>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-api</artifactId>
-        <version>${nifi.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-mock</artifactId>
-        <version>${nifi.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>io.swagger</groupId>
-        <artifactId>swagger-annotations</artifactId>
-        <version>1.6.6</version>
-      </dependency>
-      <dependency>
-        <groupId>org.glassfish.jaxb</groupId>
-        <artifactId>jaxb-runtime</artifactId>
-        <version>${jaxb.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>jakarta.xml.bind</groupId>
-        <artifactId>jakarta.xml.bind-api</artifactId>
-        <version>4.0.0</version>
-        <exclusions>
-          <exclusion>
-            <groupId>jakarta.activation</groupId>
-            <artifactId>jakarta.activation-api</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
+	<dependencyManagement>
+		<dependencies>
+			<dependency>
+				<groupId>org.apache.nifi</groupId>
+				<artifactId>nifi</artifactId>
+				<version>${nifi.version}</version>
+				<type>pom</type>
+				<scope>import</scope>
+			</dependency>
+			<dependency>
+				<groupId>org.apache.nifi</groupId>
+				<artifactId>nifi-api</artifactId>
+				<version>${nifi.version}</version>
+			</dependency>
+			<dependency>
+				<groupId>org.apache.nifi</groupId>
+				<artifactId>nifi-mock</artifactId>
+				<version>${nifi.version}</version>
+			</dependency>
+			<dependency>
+				<groupId>org.apache.nifi</groupId>
+				<artifactId>nifi-utils</artifactId>
+				<version>${nifi.version}</version>
+			</dependency>
+			<dependency>
+				<groupId>org.apache.nifi</groupId>
+				<artifactId>nifi-nar-bundles</artifactId>
+				<version>${nifi.version}</version>
+				<type>pom</type>
+				<scope>import</scope>
+			</dependency>
+			<dependency>
+				<groupId>io.swagger</groupId>
+				<artifactId>swagger-annotations</artifactId>
+				<version>1.6.6</version>
+			</dependency>
+			<dependency>
+				<groupId>org.glassfish.jaxb</groupId>
+				<artifactId>jaxb-runtime</artifactId>
+				<version>${jaxb.version}</version>
+			</dependency>
+			<dependency>
+				<groupId>jakarta.xml.bind</groupId>
+				<artifactId>jakarta.xml.bind-api</artifactId>
+				<version>4.0.0</version>
+				<exclusions>
+					<exclusion>
+						<groupId>jakarta.activation</groupId>
+						<artifactId>jakarta.activation-api</artifactId>
+					</exclusion>
+				</exclusions>
+			</dependency>
+		</dependencies>
+	</dependencyManagement>
 
-</project>
+</project>
\ No newline at end of file
diff --git a/plc4j/pom.xml b/plc4j/pom.xml
index aca4c0b81..f56b9b838 100644
--- a/plc4j/pom.xml
+++ b/plc4j/pom.xml
@@ -281,6 +281,7 @@
                 <ignoredDependency>org.codehaus.groovy:groovy-test-junit5</ignoredDependency>
                 <ignoredDependency>org.codehaus.groovy:groovy</ignoredDependency>
                 <ignoredDependency>com.athaydes:spock-reports</ignoredDependency>
+                <ignoredDependency>org.apache.nifi:nifi-standard-nar</ignoredDependency>                
               </ignoredDependencies>
             </configuration>
           </execution>