You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2023/10/02 07:01:17 UTC

[plc4x] branch develop updated: feat(integration/nifi): Various improvements for Nifi integration

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4156cc9f1e feat(integration/nifi): Various improvements for Nifi integration
4156cc9f1e is described below

commit 4156cc9f1e65485ad7d21d3630835603727a5104
Author: Unai LerĂ­a Fortea <un...@gmail.com>
AuthorDate: Mon Oct 2 09:01:11 2023 +0200

    feat(integration/nifi): Various improvements for Nifi integration
    
    * Add tag validation
    
    * Major changes
    
    * Rewrite processor to make them more consistent
    * Split processor in smaller funcions
    * Add timestamp field name
    * Allow expression language in connection string
    
    * Update readme
    
    * Add file address access strategy
    
    * Set a better exception attribute name in case of failure
    
    * Update readme
    
    * Move duplicated code to base processor
    
    * Add testing for tag validation on properties and address text
    
    * Add testing for file address access validation
    
    * Clean up
    
    * Add file address strategy test for all processors
---
 plc4j/integrations/apache-nifi/README.md           |  27 ++-
 .../org/apache/plc4x/nifi/BasePlc4xProcessor.java  | 250 +++++++++++++++++----
 .../plc4x/nifi/Plc4xListenRecordProcessor.java     | 157 ++++++-------
 .../org/apache/plc4x/nifi/Plc4xSinkProcessor.java  |  95 +++-----
 .../plc4x/nifi/Plc4xSinkRecordProcessor.java       | 161 ++++++-------
 .../apache/plc4x/nifi/Plc4xSourceProcessor.java    | 105 ++++-----
 .../plc4x/nifi/Plc4xSourceRecordProcessor.java     | 167 +++++++-------
 .../nifi/address/AddressesAccessStrategy.java      |  21 ++
 .../plc4x/nifi/address/AddressesAccessUtils.java   | 100 ++++++---
 .../plc4x/nifi/address/BaseAccessStrategy.java     | 124 ++++++++++
 .../address/DynamicPropertyAccessStrategy.java     |  35 ++-
 .../nifi/address/FilePropertyAccessStrategy.java   |  79 +++++++
 .../nifi/address/TextPropertyAccessStrategy.java   |  40 +++-
 .../nifi/record/Plc4xReadResponseRecordSet.java    |  22 +-
 .../org/apache/plc4x/nifi/record/Plc4xWriter.java  |   4 +-
 .../plc4x/nifi/record/RecordPlc4xWriter.java       |  22 +-
 .../org/apache/plc4x/nifi/record/SchemaCache.java  |   4 +-
 .../nifi/subscription/Plc4xListenerDispatcher.java |   3 +-
 .../org/apache/plc4x/nifi/util/Plc4xCommon.java    |   7 +-
 .../apache/plc4x/nifi/Plc4xSinkProcessorTest.java  |  19 +-
 .../plc4x/nifi/Plc4xSinkRecordProcessorTest.java   |  16 ++
 .../plc4x/nifi/Plc4xSourceProcessorTest.java       |  22 +-
 .../plc4x/nifi/Plc4xSourceRecordProcessorTest.java |  18 +-
 .../plc4x/nifi/address/AccessStrategyTest.java     | 190 ++++++++++++++++
 24 files changed, 1196 insertions(+), 492 deletions(-)

diff --git a/plc4j/integrations/apache-nifi/README.md b/plc4j/integrations/apache-nifi/README.md
index a80e6f8ed4..04aeed8654 100644
--- a/plc4j/integrations/apache-nifi/README.md
+++ b/plc4j/integrations/apache-nifi/README.md
@@ -20,8 +20,9 @@ under the License.
 
 ## Common properties
 The following properties applies to all Plc4x Processors:
-* Connection String: A constant connection string such as `s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200`.
-* Read/Write/Subscribe timeout (miliseconds): Specifies the time in milliseconds for the connection to return a timeout. In case of subscription the timeout is used to renew connections.
+* Connection String: A constant connection string such as `s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200` or a valid Expression Language ([Expression Language NiFi documentation](https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html)) such as `${plc4x.connection_string}`.
+* Timeout (miliseconds): Specifies the time in milliseconds for the connection to return a timeout. Is used to renew connections. Can be set with Expression Language.
+* Timestamp field name: It defines the name of the field that represents the time when the response from the Plc was received. It will be added to the attributes or to the record deppending on the processor used.
 * Address Access Strategy: defines how the processor obtains the PLC addresses. It can take 2 values:
   * **Properties as Addreses:** 
       For each variable, add a new property to the processor where the property name matches the variable name, and the variable value corresponds to the address tag. 
@@ -48,6 +49,21 @@ The following properties applies to all Plc4x Processors:
     }
     ```
     If this JSON is in an attribute `plc4x.addresses` it can be accessed with *Address Text*=`${plc4x.addresses}`. 
+  
+  * **Address File:**
+    Property *Address File* must be supplied with a path to a file in JSON format that contains variable name and address tag. Expression Language is supported.
+
+    For example a file in:
+    - *Address File*:```/home/nifi/s7addresses.json```  
+    With the following content
+    ```json
+    {
+      "var1" : "%DB1:DBX0.0:BOOL",
+      "var2" : "%DB1:DBX0.1:BOOL"
+    }
+    ```
+    If the file name is in an attribute `plc4x.addresses_file` it can be accessed with *Address File*=`${plc4x.addresses_file}`. 
+
 
 
 When reading from a PLC the response is used to create a mapping between Plc types into Avro. The mapping is done as follows:
@@ -87,10 +103,6 @@ Table of data mapping between plc data and Avro types (as specified in [Avro spe
 Also, it is important to keep in mind the Processor Scheduling Configuration. Using the parameter **Run Schedule** (for example to *1 sec*), the reading frequency can be set. Note that by default, this value is defined to 0 sec (as fast as possible).
 
 
-## Plc4xSinkProcessor
-
-## Plc4xSourceProcessor
-
 ## Plc4xSinkRecordProcessor
 
 This processor is <ins>record oriented</ins>, reads from a formated input flowfile content using a Record Reader (for further information see [NiFi Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)). 
@@ -124,6 +136,7 @@ An *example* for reading values from a S7-1200:
 - *PLC connection String:* *s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200*
 - *Record Writer:* *PLC4x Embedded - AvroRecordSetWriter*
 - *Read timeout (miliseconds):* *10000*
+- *Timestamp field name:* *timestamp*  
 - *var1:* *%DB1:DBX0.0:BOOL*
 - *var2:* *%DB1:DBX0.1:BOOL*
 - *var3:* *%DB1:DBB01:BYTE*
@@ -162,6 +175,6 @@ The output flowfile will contain the PLC read values. This information is includ
   "var3" : "\u0005",
   "var5" : 1992,
   "var4" : "4",
-  "ts" : 1628783058433
+  "timestamp" : 1628783058433
 } ]
 ```
\ No newline at end of file
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
index a8da46c46f..b2b05635d8 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/BasePlc4xProcessor.java
@@ -18,36 +18,49 @@
  */
 package org.apache.plc4x.nifi;
 
+import java.io.OutputStream;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.PlcDriver;
-import org.apache.plc4x.java.api.PlcDriverManager;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.model.PlcTag;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
 import org.apache.plc4x.nifi.address.AddressesAccessStrategy;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.DynamicPropertyAccessStrategy;
+import org.apache.plc4x.nifi.record.Plc4xWriter;
 import org.apache.plc4x.nifi.record.SchemaCache;
 
 public abstract class BasePlc4xProcessor extends AbstractProcessor {
@@ -55,28 +68,31 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
     protected List<PropertyDescriptor> properties;
     protected Set<Relationship> relationships;
     protected volatile boolean debugEnabled;
-  
-    protected String connectionString;
-    protected Map<String, String> addressMap;
-    protected Long timeout;
+    protected Integer cacheSize = 0;
 
     protected final SchemaCache schemaCache = new SchemaCache(0);
+    protected AddressesAccessStrategy addressAccessStrategy;
 
-    private final PlcConnectionManager connectionManager = CachedPlcConnectionManager.getBuilder()
-        .withMaxLeaseTime(Duration.ofSeconds(1000L))
-        .withMaxWaitTime(Duration.ofSeconds(500L))
-        .build();
+    private CachedPlcConnectionManager connectionManager;
 
-    protected static final List<AllowableValue> addressAccessStrategy = Collections.unmodifiableList(Arrays.asList(
-        AddressesAccessUtils.ADDRESS_PROPERTY,
-        AddressesAccessUtils.ADDRESS_TEXT));
+    protected CachedPlcConnectionManager getConnectionManager() {
+        return connectionManager;
+    }
+
+    protected void refreshConnectionManager() {
+        connectionManager = CachedPlcConnectionManager.getBuilder()
+            .withMaxLeaseTime(Duration.ofSeconds(1000L))
+            .withMaxWaitTime(Duration.ofSeconds(500L))
+            .build();
+    }
 
 
-	protected static final PropertyDescriptor PLC_CONNECTION_STRING = new PropertyDescriptor.Builder()
+	public static final PropertyDescriptor PLC_CONNECTION_STRING = new PropertyDescriptor.Builder()
         .name("plc4x-connection-string")
         .displayName("PLC connection String")
         .description("PLC4X connection string used to connect to a given PLC device.")
         .required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .addValidator(new Plc4xConnectionStringValidator())
         .build();
 	
@@ -86,6 +102,7 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
 		.description("Maximum number of entries in the cache. Can improve performance when addresses change dynamically.")
 		.defaultValue("1")
 		.required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 		.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
 		.build();
 
@@ -95,9 +112,20 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
 		.description( "Request timeout in miliseconds")
 		.defaultValue("10000")
 		.required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
 		.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
 		.build();
 
+    public static final PropertyDescriptor PLC_TIMESTAMP_FIELD_NAME = new PropertyDescriptor.Builder()
+        .name("plc4x-timestamp-field-name")
+        .displayName("Timestamp Field Name")
+        .description("Name of the field that will display the timestamp of the operation.")
+        .required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .addValidator(new Plc4xTimestampFieldValidator())
+        .defaultValue("ts")
+        .build();
+
 
     protected static final Relationship REL_SUCCESS = new Relationship.Builder()
 	    .name("success")
@@ -117,8 +145,10 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
     	properties.add(PLC_CONNECTION_STRING);
         properties.add(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY);
         properties.add(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+        properties.add(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
         properties.add(PLC_SCHEMA_CACHE_SIZE);
         properties.add(PLC_FUTURE_TIMEOUT_MILISECONDS);
+        properties.add(PLC_TIMESTAMP_FIELD_NAME);
         this.properties = Collections.unmodifiableList(properties);
 
     	
@@ -129,12 +159,19 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
     }
 
     public Map<String, String> getPlcAddressMap(ProcessContext context, FlowFile flowFile) {
-        AddressesAccessStrategy strategy = AddressesAccessUtils.getAccessStrategy(context);
-        return strategy.extractAddresses(context, flowFile);
+        return addressAccessStrategy.extractAddresses(context, flowFile);
     }
     
-    public String getConnectionString() {
-        return connectionString;
+    public String getConnectionString(ProcessContext context, FlowFile flowFile) {
+        return context.getProperty(PLC_CONNECTION_STRING).evaluateAttributeExpressions(flowFile).getValue();
+    }
+
+    public Long getTimeout(ProcessContext context, FlowFile flowFile) {
+        return context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).evaluateAttributeExpressions(flowFile).asLong();
+    }
+
+    public String getTimestampField(ProcessContext context) {
+        return context.getProperty(PLC_TIMESTAMP_FIELD_NAME).evaluateAttributeExpressions().getValue();
     }
 
     public SchemaCache getSchemaCache() {
@@ -151,14 +188,14 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
         return properties;
     }
     
-    //dynamic prop
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
                 .name(propertyDescriptorName)
-                .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
                 .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
                 .dependsOn(AddressesAccessUtils.PLC_ADDRESS_ACCESS_STRATEGY, AddressesAccessUtils.ADDRESS_PROPERTY)
+                .addValidator(new DynamicPropertyAccessStrategy.TagValidator(AddressesAccessUtils.getManager()))
                 .required(false)
                 .dynamic(true)
                 .build();
@@ -167,10 +204,14 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-		connectionString = context.getProperty(PLC_CONNECTION_STRING.getName()).getValue();
-        schemaCache.restartCache(context.getProperty(PLC_SCHEMA_CACHE_SIZE).asInteger());
+        Integer newCacheSize = context.getProperty(PLC_SCHEMA_CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+        if (!newCacheSize.equals(cacheSize)){
+            schemaCache.restartCache(newCacheSize);
+            cacheSize = newCacheSize;
+        }
+        refreshConnectionManager();
         debugEnabled = getLogger().isDebugEnabled();
-        timeout = context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS.getName()).asLong();
+        addressAccessStrategy = AddressesAccessUtils.getAccessStrategy(context);
     }
 
     @Override
@@ -186,37 +227,162 @@ public abstract class BasePlc4xProcessor extends AbstractProcessor {
         }
         BasePlc4xProcessor that = (BasePlc4xProcessor) o;
         return Objects.equals(properties, that.properties) &&
-            Objects.equals(getRelationships(), that.getRelationships()) &&
-            Objects.equals(getConnectionString(), that.getConnectionString());
+            Objects.equals(getRelationships(), that.getRelationships());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), properties, getRelationships(), getConnectionString());
+        return Objects.hash(super.hashCode(), properties, getRelationships());
     }
 
-    public static class Plc4xConnectionStringValidator implements Validator {
-        @Override
-        public ValidationResult validate(String subject, String input, ValidationContext context) {
-            // TODO: Add validation here ...
-            return new ValidationResult.Builder().subject(subject).explanation("").valid(true).build();
+    protected PlcWriteRequest getWriteRequest(final ComponentLog logger,
+            final Map<String, String> addressMap, final Map<String, PlcTag> tags, final Map<String, ? extends Object> presentTags,
+            final PlcConnection connection, final AtomicLong nrOfRowsHere) {
+
+        PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
+
+        if (tags != null){
+            for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+                if (presentTags.containsKey(tag.getKey())) {
+                    builder.addTag(tag.getKey(), tag.getValue(), presentTags.get(tag.getKey()));
+                    if (nrOfRowsHere != null) {
+                        nrOfRowsHere.incrementAndGet();
+                    }
+                } else {
+                    if (debugEnabled)
+                        logger.debug("PlcTag " + tag + " is declared as address but was not found on input record.");
+                }
+            }
+        } else {
+            if (debugEnabled)
+                logger.debug("PlcTypes resolution not found in cache and will be added with key: " + addressMap);
+            for (Map.Entry<String,String> entry: addressMap.entrySet()){
+                if (presentTags.containsKey(entry.getKey())) {
+                    builder.addTagAddress(entry.getKey(), entry.getValue(), presentTags.get(entry.getKey()));
+                    if (nrOfRowsHere != null) {
+                        nrOfRowsHere.incrementAndGet();
+                    }
+                }
+            }
+        }
+         
+        return builder.build();
+    }
+
+    protected PlcReadRequest getReadRequest(final ComponentLog logger, 
+            final Map<String, String> addressMap, final Map<String, PlcTag> tags,
+            final PlcConnection connection) {
+
+        PlcReadRequest.Builder builder = connection.readRequestBuilder();
+
+        if (tags != null){
+            for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
+                builder.addTag(tag.getKey(), tag.getValue());
+            }
+        } else {
+            if (debugEnabled)
+                logger.debug("Plc-Avro schema and PlcTypes resolution not found in cache and will be added with key: " + addressMap);
+            for (Map.Entry<String,String> entry: addressMap.entrySet()){
+                builder.addTagAddress(entry.getKey(), entry.getValue());
+            }
+        }
+        return builder.build();
+	}
+
+    protected void evaluateWriteResponse(final ComponentLog logger, Map<String, ? extends Object> values, PlcWriteResponse plcWriteResponse) {
+
+		boolean codeErrorPresent = false;
+		List<String> tagsAtError = null;
+
+		PlcResponseCode code = null;
+
+		for (String tag : plcWriteResponse.getTagNames()) {
+			code = plcWriteResponse.getResponseCode(tag);
+			if (!code.equals(PlcResponseCode.OK)) {
+				if (tagsAtError == null) {
+					tagsAtError = new ArrayList<>();
+				}
+				logger.error("Not OK code when writing the data to PLC for tag " + tag 
+					+ " with value  " + values.get(tag).toString() 
+					+ " in addresss " + plcWriteResponse.getTag(tag).getAddressString());
+				
+			codeErrorPresent = true;
+			tagsAtError.add(tag);
+						
+			}
+		}
+		if (codeErrorPresent) {
+			throw new ProcessException("At least one error was found when while writting tags: " + tagsAtError.toString());
+		}
+	}
+
+   protected void evaluateReadResponse(final ProcessSession session, final FlowFile flowFile, final PlcReadResponse response) {
+        Map<String, String> attributes = new HashMap<>();
+        for (String tagName : response.getTagNames()) {
+            for (int i = 0; i < response.getNumberOfValues(tagName); i++) {
+                Object value = response.getObject(tagName, i);
+                attributes.put(tagName, String.valueOf(value));
+            }
         }
+        session.putAllAttributes(flowFile, attributes);
     }
 
-    public static class Plc4xAddressStringValidator implements Validator {
+    protected long evaluateReadResponse(final ProcessContext context, final ComponentLog logger, final FlowFile originalFlowFile,
+			Plc4xWriter plc4xWriter, OutputStream out, final RecordSchema recordSchema, PlcReadResponse readResponse)
+			throws Exception {
+
+		if(originalFlowFile == null) //there is no inherit attributes to use in writer service 
+			return plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, recordSchema, getTimestampField(context));
+		else 
+			return plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, recordSchema, originalFlowFile, getTimestampField(context));
+	}
+
+    protected static class Plc4xConnectionStringValidator implements Validator {
         @Override
         public ValidationResult validate(String subject, String input, ValidationContext context) {
-            // TODO: Add validation here ...
-            return new ValidationResult.Builder().subject(subject).explanation("").valid(true).build();
+            DefaultPlcDriverManager manager = new DefaultPlcDriverManager();
+            
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+            }
+            try {
+                PlcDriver driver =  manager.getDriverForUrl(input);
+                driver.getConnection(input);
+            } catch (PlcConnectionException e) {
+                return new ValidationResult.Builder().subject(subject)
+                    .explanation(e.getMessage())
+                    .valid(false)
+                    .build();
+            }
+            return new ValidationResult.Builder().subject(subject)
+                .explanation("")
+                .valid(true)
+                .build();
         }
     }
 
-    protected PlcConnectionManager getConnectionManager() {
-        return connectionManager;
-    }
+    protected static class Plc4xTimestampFieldValidator implements Validator {
+        @Override
+        public ValidationResult validate(String subject, String input, ValidationContext context) {
 
-    protected PlcDriver getDriver() throws PlcConnectionException {
-        return PlcDriverManager.getDefault().getDriverForUrl(connectionString);
-    }
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+            }
+            
+            Map<String, String> allProperties = context.getAllProperties();
+            allProperties.remove(subject);
+
+            if (allProperties.containsValue(input)) {
+                return new ValidationResult.Builder().subject(subject)
+                    .explanation("Timestamp field must be unique")
+                    .valid(false)
+                    .build(); 
+            }
+            return new ValidationResult.Builder().subject(subject)
+                .explanation("")
+                .valid(true)
+                .build();
 
+        }
+    }
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
index 0664fa0015..4b6e47b719 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xListenRecordProcessor.java
@@ -30,7 +30,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -85,6 +84,7 @@ public class Plc4xListenRecordProcessor extends BasePlc4xProcessor {
 	protected Plc4xListenerDispatcher dispatcher;
 	protected RecordSchema recordSchema;
 	protected Thread readerThread;
+	protected Map<String, String> addressMap;
 	final StopWatch executeTime = new StopWatch(false);
 
 	public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
@@ -112,21 +112,7 @@ public class Plc4xListenRecordProcessor extends BasePlc4xProcessor {
 		.dependsOn(PLC_SUBSCRIPTION_TYPE, Plc4xSubscriptionType.CYCLIC.name())
 		.required(true)
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-		.addValidator(new Validator() {
-			@Override
-			public ValidationResult validate(String subject, String input, ValidationContext context) {
-				if (context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).asLong() > Long.valueOf(input)) {
-					return new ValidationResult.Builder().valid(true).build();
-				} else {
-					return new ValidationResult.Builder()
-					.valid(false)
-					.input(input)
-					.subject(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL.getDisplayName())
-					.explanation(String.format("it must me smaller than the value of %s", PLC_FUTURE_TIMEOUT_MILISECONDS.getDisplayName()))
-					.build();
-				}
-			}	
-		})
+		.addValidator(new CyclycPollingIntervalValidator())
         .defaultValue("10000")
 		.build();
 
@@ -151,20 +137,19 @@ public class Plc4xListenRecordProcessor extends BasePlc4xProcessor {
 		super.onScheduled(context);
 		subscriptionType = Plc4xSubscriptionType.valueOf(context.getProperty(PLC_SUBSCRIPTION_TYPE).getValue());
         cyclingPollingInterval = context.getProperty(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL).asLong();
-		addressMap = getPlcAddressMap(context, null);
-
-		createDispatcher(events);
+		createDispatcher(context, events);
 	}
 
-    protected void createDispatcher(final BlockingQueue<PlcSubscriptionEvent> events) {
+    protected void createDispatcher(final ProcessContext context, final BlockingQueue<PlcSubscriptionEvent> events) {
 		if (readerThread != null) {
 			return;
 		}
 
 		// create the dispatcher and calls open() to start listening to the plc subscription
-        dispatcher =  new Plc4xListenerDispatcher(timeout, subscriptionType, cyclingPollingInterval, getLogger(), events);
+        dispatcher =  new Plc4xListenerDispatcher(getTimeout(context, null), subscriptionType, cyclingPollingInterval, getLogger(), events);
 		try {
-			dispatcher.open(getConnectionString(), addressMap);
+			addressMap = getPlcAddressMap(context, null);
+			dispatcher.open(getConnectionString(context, null), addressMap);
 		} catch (Exception e) {
 			if (debugEnabled) {
 				getLogger().debug("Error creating a the subscription event dispatcher");
@@ -198,7 +183,7 @@ public class Plc4xListenRecordProcessor extends BasePlc4xProcessor {
 		}
     }
 
-	protected PlcSubscriptionEvent getMessage() {
+	protected PlcSubscriptionEvent getMessage(final ProcessContext context) {
 		if (readerThread != null && readerThread.isAlive()) {
 			return events.poll();
 			
@@ -208,14 +193,14 @@ public class Plc4xListenRecordProcessor extends BasePlc4xProcessor {
 			getLogger().debug("Connection to Plc broke. Trying to restart connection");
 		}
 		closeDispatcher();
-		createDispatcher(events);
+		createDispatcher(context, events);
 		throw new ProcessException("Connection to Plc broke. Trying to restart connection");
 	}
 	
 	@Override
 	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
 
-		DefaultPlcSubscriptionEvent event = (DefaultPlcSubscriptionEvent) getMessage();
+		DefaultPlcSubscriptionEvent event = (DefaultPlcSubscriptionEvent) getMessage(context);
 
 		if (event == null) {
 			return;
@@ -226,70 +211,25 @@ public class Plc4xListenRecordProcessor extends BasePlc4xProcessor {
 
 		final AtomicLong nrOfRows = new AtomicLong(0L);
 
-		FlowFile resultSetFF;
-		resultSetFF = session.create();
+		FlowFile resultSetFF = session.create();
 
 		Plc4xWriter plc4xWriter = new RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class), Collections.emptyMap());
 
 		try {
-			resultSetFF = session.write(resultSetFF, out -> {
+			session.write(resultSetFF, out -> {
 				try {
-					nrOfRows.set(plc4xWriter.writePlcReadResponse(event, out, getLogger(), null, recordSchema));
+					nrOfRows.set(plc4xWriter.writePlcReadResponse(event, out, getLogger(), null, recordSchema, getTimestampField(context)));
 				}  catch (Exception e) {
 					getLogger().error("Exception reading the data from PLC", e);
 					throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
 				}
 
 				if (recordSchema == null){
-					if (debugEnabled)
-						getLogger().debug("Adding Plc-Avro schema and PlcTypes resolution into cache with key: " + addressMap.toString());
-					
-					// Add schema to the cache
-					LinkedHashSet<String> addressNames = new LinkedHashSet<String>();
-					addressNames.addAll(event.getTagNames());
-					
-					List<PlcTag> addressTags = addressNames.stream().map(
-						new Function<String,PlcTag>() {
-							@Override
-							public PlcTag apply(String addr) {
-								return new PlcTag() {
-									@Override
-									public String getAddressString() {
-										return addr;
-									}
-
-									@Override
-									public PlcValueType getPlcValueType() {
-										return event.getPlcValue(addr).getPlcValueType();
-									}
-								};
-							}
-						}).collect(Collectors.toList()); 
-
-					getSchemaCache().addSchema(
-						addressMap, 
-						addressNames,
-						addressTags,
-						plc4xWriter.getRecordSchema()
-					);
-					recordSchema = getSchemaCache().retrieveSchema(addressMap);
+					addTagsToCache(event, plc4xWriter);
 				}
 			});
-			long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
-			executeTime.stop();
-			
-			final Map<String, String> attributesToAdd = new HashMap<>();
-			attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-			attributesToAdd.put(RESULT_LAST_EVENT, String.valueOf(executionTimeElapsed));
-
-			attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
-			resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
-			plc4xWriter.updateCounters(session);
-			getLogger().info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());
-			
-			session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows from subscription", executionTimeElapsed);
+			resultSetFF = completeResultFlowFile(session, nrOfRows, resultSetFF, plc4xWriter);
 			session.transfer(resultSetFF, REL_SUCCESS);
-			session.commitAsync();
 
 			executeTime.start();
 
@@ -298,4 +238,71 @@ public class Plc4xListenRecordProcessor extends BasePlc4xProcessor {
 			throw new ProcessException("Got an error while trying to get a subscription event", e);
 		}
 	}
+
+	private void addTagsToCache(DefaultPlcSubscriptionEvent event, Plc4xWriter plc4xWriter) {
+		if (debugEnabled)
+			getLogger().debug("Adding Plc-Avro schema and PlcTypes resolution into cache with key: " + addressMap.toString());
+		
+		// Add schema to the cache
+		LinkedHashSet<String> addressNames = new LinkedHashSet<>();
+		addressNames.addAll(event.getTagNames());
+		
+		List<PlcTag> addressTags = addressNames.stream().map(addr -> 
+				new PlcTag() {
+					@Override
+					public String getAddressString() {
+						return addr;
+					}
+
+					@Override
+					public PlcValueType getPlcValueType() {
+						return event.getPlcValue(addr).getPlcValueType();
+					}
+				}
+			).collect(Collectors.toList()); 
+
+		getSchemaCache().addSchema(
+			addressMap, 
+			addressNames,
+			addressTags,
+			plc4xWriter.getRecordSchema()
+		);
+		recordSchema = getSchemaCache().retrieveSchema(addressMap);
+	}
+
+	private FlowFile completeResultFlowFile(final ProcessSession session, final AtomicLong nrOfRows, FlowFile resultSetFF,
+			Plc4xWriter plc4xWriter) {
+				
+		long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
+		executeTime.stop();
+		
+		final Map<String, String> attributesToAdd = new HashMap<>();
+		attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+		attributesToAdd.put(RESULT_LAST_EVENT, String.valueOf(executionTimeElapsed));
+
+		attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
+		resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
+		plc4xWriter.updateCounters(session);
+		getLogger().info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());
+		
+		session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows from subscription", executionTimeElapsed);
+		return resultSetFF;
+	}
+
+
+	protected static class CyclycPollingIntervalValidator implements Validator {
+		@Override
+		public ValidationResult validate(String subject, String input, ValidationContext context) {
+			if (context.getProperty(PLC_FUTURE_TIMEOUT_MILISECONDS).asLong() > Long.valueOf(input)) {
+				return new ValidationResult.Builder().valid(true).build();
+			} else {
+				return new ValidationResult.Builder()
+				.valid(false)
+				.input(input)
+				.subject(PLC_SUBSCRIPTION_CYCLIC_POLLING_INTERVAL.getDisplayName())
+				.explanation(String.format("it must me smaller than the value of %s", PLC_FUTURE_TIMEOUT_MILISECONDS.getDisplayName()))
+				.build();
+			}
+		}	
+	}
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
index 9c1cb758e7..f18b35e893 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkProcessor.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.nifi;
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -36,7 +37,6 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.plc4x.java.api.PlcConnection;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.api.model.PlcTag;
 
 @TriggerSerially
@@ -47,83 +47,62 @@ import org.apache.plc4x.java.api.model.PlcTag;
 @ReadsAttributes({@ReadsAttribute(attribute="value", description="some value")})
 public class Plc4xSinkProcessor extends BasePlc4xProcessor {
 
+	public static final String EXCEPTION = "plc4x.write.exception";
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         FlowFile flowFile = session.get();
-        final ComponentLog logger = getLogger();
-
+        
         // Abort if there's nothing to do.
         if (flowFile == null) {
             return;
         }
 
-        // Get an instance of a component able to write to a PLC.
-        try(PlcConnection connection = getConnectionManager().getConnection(getConnectionString())) {
+        final ComponentLog logger = getLogger();
+
+        try(PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, flowFile))) {
             if (!connection.getMetadata().canWrite()) {
                 throw new ProcessException("Writing not supported by connection");
             }
-
-            // Prepare the request.
-            PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
-            Map<String,String> addressMap = getPlcAddressMap(context, flowFile);
+            
+            final Map<String,String> addressMap = getPlcAddressMap(context, flowFile);
             final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
 
-            if (tags != null){
-                for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
-                    if (flowFile.getAttributes().containsKey(tag.getKey())) {
-                        builder.addTag(tag.getKey(), tag.getValue(), flowFile.getAttribute(tag.getKey()));
-                    } else {
-                        if (debugEnabled)
-                            logger.debug("PlcTag " + tag + " is declared as address but was not found on input record.");
-                    }
-                }
-            } else {
-                for (Map.Entry<String,String> entry: addressMap.entrySet()){
-                    if (flowFile.getAttributes().containsKey(entry.getKey())) {
-                        builder.addTagAddress(entry.getKey(), entry.getValue(), flowFile.getAttribute(entry.getKey()));
-                    }
-                }
-                if (debugEnabled)
-                    logger.debug("PlcTypes resolution not found in cache and will be added with key: " + addressMap);
-            }
-           
-            PlcWriteRequest writeRequest = builder.build();
+            PlcWriteRequest writeRequest = getWriteRequest(logger, addressMap, tags, flowFile.getAttributes(), connection, null);
 
-            // Send the request to the PLC.
             try {
-                final PlcWriteResponse plcWriteResponse = writeRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
-                PlcResponseCode code = null;
-
-                for (String tag : plcWriteResponse.getTagNames()) {
-                    code = plcWriteResponse.getResponseCode(tag);
-                    if (!code.equals(PlcResponseCode.OK)) {
-                        logger.error("Not OK code when writing the data to PLC for tag " + tag 
-								+ " with value  " + flowFile.getAttribute(tag)
-								+ " in addresss " + plcWriteResponse.getTag(tag).getAddressString());
-                        throw new Exception(code.toString());
-                    }
-                }
-                session.transfer(flowFile, REL_SUCCESS);
+                final PlcWriteResponse plcWriteResponse = writeRequest.execute().get(getTimeout(context, flowFile), TimeUnit.MILLISECONDS);
 
-                if (tags == null){
-                    if (debugEnabled)
-                        logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
-                    getSchemaCache().addSchema(
-                        addressMap, 
-                        writeRequest.getTagNames(),
-                        writeRequest.getTags(),
-                        null
-                    );
-                }
+                evaluateWriteResponse(logger, flowFile.getAttributes(), plcWriteResponse);
+ 
+            } catch (TimeoutException e) {
+                logger.error("Timeout writting the data to the PLC", e);
+                getConnectionManager().removeCachedConnection(getConnectionString(context, flowFile));
+                throw new ProcessException(e);
             } catch (Exception e) {
-                flowFile = session.putAttribute(flowFile, "exception", e.getLocalizedMessage());
-                session.transfer(flowFile, REL_FAILURE);
+                logger.error("Exception writting the data to the PLC", e);
+                throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
             }
 
-        } catch (ProcessException e) {
-            throw e;
+            session.transfer(flowFile, REL_SUCCESS);
+
+            if (tags == null){
+                if (debugEnabled)
+                    logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
+                getSchemaCache().addSchema(
+                    addressMap, 
+                    writeRequest.getTagNames(),
+                    writeRequest.getTags(),
+                    null
+                );
+            }
+
+
         } catch (Exception e) {
-            throw new ProcessException("Got an error while trying to get a connection", e);
+            flowFile = session.putAttribute(flowFile, EXCEPTION, e.getLocalizedMessage());
+            session.transfer(flowFile, REL_FAILURE);
+            session.commitAsync();
+            throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
         }
     }
 
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
index 7f58803b45..661f919266 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessor.java
@@ -18,7 +18,6 @@
  */
 package org.apache.plc4x.nifi;
 
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -27,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -52,10 +52,10 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.StopWatch;
 import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.messages.PlcWriteRequest;
 import org.apache.plc4x.java.api.messages.PlcWriteResponse;
 import org.apache.plc4x.java.api.model.PlcTag;
-import org.apache.plc4x.java.api.types.PlcResponseCode;
 
 @TriggerSerially
 @Tags({"plc4x", "put", "sink", "record"})
@@ -73,6 +73,7 @@ public class Plc4xSinkRecordProcessor extends BasePlc4xProcessor {
 	public static final String RESULT_ROW_COUNT = "plc4x.write.row.count";
 	public static final String RESULT_QUERY_EXECUTION_TIME = "plc4x.write.query.executiontime";
 	public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+	public static final String EXCEPTION = "plc4x.write.exception";
 	
 	public static final PropertyDescriptor PLC_RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
 			.name("record-reader").displayName("Record Reader")
@@ -105,116 +106,90 @@ public class Plc4xSinkRecordProcessor extends BasePlc4xProcessor {
         }
 			
 		final ComponentLog logger = getLogger();
+
 		// Get an instance of a component able to read from a PLC.
 		final AtomicLong nrOfRows = new AtomicLong(0L);
 		final StopWatch executeTime = new StopWatch(true);
 
-		final FlowFile originalFlowFile = fileToProcess;
-
-		InputStream in = session.read(originalFlowFile);
-
-		Record record = null;
-		
-		try (RecordReader recordReader = context.getProperty(PLC_RECORD_READER_FACTORY)
-			.asControllerService(RecordReaderFactory.class)
-			.createRecordReader(originalFlowFile, in, logger)){
-
-			while ((record = recordReader.nextRecord()) != null) {
-				long nrOfRowsHere = 0L;
-				PlcWriteResponse plcWriteResponse;
-				PlcWriteRequest writeRequest;
-
-				Map<String,String> addressMap = getPlcAddressMap(context, fileToProcess);
-				final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
-
-				try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString())) {
-					PlcWriteRequest.Builder builder = connection.writeRequestBuilder();
-					
-					
-					if (tags != null){
-						for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
-							if (record.toMap().containsKey(tag.getKey())) {
-								builder.addTag(tag.getKey(), tag.getValue(), record.getValue(tag.getKey()));
-								nrOfRowsHere++;
-							} else {
-								if (debugEnabled)
-                    				logger.debug("PlcTag " + tag + " is declared as address but was not found on input record.");
-							}
+		try {
+			session.read(fileToProcess, in -> {
+				Record record = null;
+			
+				try (RecordReader recordReader = context.getProperty(PLC_RECORD_READER_FACTORY)
+					.asControllerService(RecordReaderFactory.class)
+					.createRecordReader(fileToProcess, in, logger)){
+
+					while ((record = recordReader.nextRecord()) != null) {
+						AtomicLong nrOfRowsHere = new AtomicLong(0);
+						PlcWriteRequest writeRequest;
+
+						final Map<String,String> addressMap = getPlcAddressMap(context, fileToProcess);
+						final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
+
+						try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, fileToProcess))) {
+							
+							writeRequest = getWriteRequest(logger, addressMap, tags, record.toMap(), connection, nrOfRowsHere);
+
+							PlcWriteResponse plcWriteResponse = writeRequest.execute().get(getTimeout(context, fileToProcess), TimeUnit.MILLISECONDS);
+
+							// Response check if values were written
+							evaluateWriteResponse(logger, record.toMap(), plcWriteResponse);
+
+						} catch (TimeoutException e) {
+							logger.error("Timeout writting the data to the PLC", e);
+							getConnectionManager().removeCachedConnection(getConnectionString(context, fileToProcess));
+							throw new ProcessException(e);
+						} catch (PlcConnectionException e) {
+							logger.error("Error getting the PLC connection", e);
+							throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e);
+						} catch (Exception e) {
+							logger.error("Exception writting the data to the PLC", e);
+							throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
 						}
-					} else {
-						if (debugEnabled)
-                   			logger.debug("Plc-Avro schema and PlcTypes resolution not found in cache and will be added with key: " + addressMap);
-						for (Map.Entry<String,String> entry: addressMap.entrySet()){
-							if (record.toMap().containsKey(entry.getKey())) {
-								builder.addTagAddress(entry.getKey(), entry.getValue(), record.getValue(entry.getKey()));
-								nrOfRowsHere++;
-							} else {
-								if (debugEnabled)
-                    				logger.debug("PlcTag " + entry.getKey() + " with address " + entry.getValue() + " was not found on input record.");
-							}
+							
+						if (tags == null){
+							if (debugEnabled)
+								logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
+							getSchemaCache().addSchema(
+								addressMap, 
+								writeRequest.getTagNames(),
+								writeRequest.getTags(),
+								null
+							);
 						}
-					}
-					writeRequest = builder.build();
-
-					plcWriteResponse = writeRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
+						nrOfRows.getAndAdd(nrOfRowsHere.get());
 
+						
+					}
 				} catch (Exception e) {
-					in.close();
-					logger.error("Exception writing the data to PLC", e);
-					session.transfer(originalFlowFile, REL_FAILURE);
-					session.commitAsync();
 					throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
-				}
-
-				// Response check if values were written
-				if (plcWriteResponse != null){
-					PlcResponseCode code = null;
-
-					for (String tag : plcWriteResponse.getTagNames()) {
-						code = plcWriteResponse.getResponseCode(tag);
-						if (!code.equals(PlcResponseCode.OK)) {
-							logger.error("Not OK code when writing the data to PLC for tag " + tag 
-								+ " with value  " + record.getValue(tag).toString() 
-								+ " in addresss " + plcWriteResponse.getTag(tag).getAddressString());
-							throw new ProcessException("Writing response code for " + plcWriteResponse.getTag(tag).getAddressString() + "was " + code.name() + ", expected OK");
-						}
-					}
-					if (tags == null && writeRequest != null){
-						if (debugEnabled)
-							logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
-						getSchemaCache().addSchema(
-							addressMap, 
-							writeRequest.getTagNames(),
-							writeRequest.getTags(),
-							null
-						);
-					}
-					nrOfRows.getAndAdd(nrOfRowsHere);
-				}
-			}
-			in.close();
-		} catch (Exception e) {
-			throw new ProcessException(e);
+				} 
+			});
+
+		} catch (ProcessException e) {
+			logger.error("Exception writing the data to the PLC", e);
+			session.putAttribute(fileToProcess, EXCEPTION, e.getLocalizedMessage());
+			session.transfer(fileToProcess, REL_FAILURE);
+			session.commitAsync();
+			throw e;
 		} 
-		
+
+
 		long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
 		final Map<String, String> attributesToAdd = new HashMap<>();
 		attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
 		attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
 		attributesToAdd.put(INPUT_FLOWFILE_UUID, fileToProcess.getAttribute(CoreAttributes.UUID.key()));
 		
-		FlowFile resultSetFF = session.putAllAttributes(originalFlowFile, attributesToAdd);
+		session.putAllAttributes(fileToProcess, attributesToAdd);
 
-		logger.info("Writing {} fields from {} records; transferring to 'success'", new Object[] { nrOfRows.get(), resultSetFF });
-		// Report a FETCH event if there was an incoming flow file, or a RECEIVE event
-		// otherwise
+		session.transfer(fileToProcess, REL_SUCCESS);
+		
+		logger.info("Writing {} fields from {} records; transferring to 'success'", nrOfRows.get(), fileToProcess);
 		if (context.hasIncomingConnection()) {
-			session.getProvenanceReporter().fetch(resultSetFF, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
+			session.getProvenanceReporter().fetch(fileToProcess, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
 		} else {
-			session.getProvenanceReporter().receive(resultSetFF, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
+			session.getProvenanceReporter().receive(fileToProcess, "Writted " + nrOfRows.get() + " rows", executionTimeElapsed);
 		}
-
-		session.transfer(resultSetFF, BasePlc4xProcessor.REL_SUCCESS);
-		session.commitAsync();
 	}
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
index 0f57de46d0..a6f2da2e17 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceProcessor.java
@@ -18,10 +18,9 @@
  */
 package org.apache.plc4x.nifi;
 
-import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -46,70 +45,74 @@ import org.apache.plc4x.java.api.model.PlcTag;
 @WritesAttributes({@WritesAttribute(attribute="value", description="some value")})
 public class Plc4xSourceProcessor extends BasePlc4xProcessor {
 
+	public static final String EXCEPTION = "plc4x.read.exception";
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        
+        FlowFile incomingFlowFile = null;
+        if (context.hasIncomingConnection()) {
+            incomingFlowFile = session.get();
+            if (incomingFlowFile == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
 
         final ComponentLog logger = getLogger();
-        // Get an instance of a component able to read from a PLC.
-        try(PlcConnection connection = getConnectionManager().getConnection(getConnectionString())) {
+        final FlowFile flowFile = session.create();
+    
+        try(PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, incomingFlowFile))) {
 
-            // Prepare the request.
             if (!connection.getMetadata().canRead()) {
-                throw new ProcessException("Writing not supported by connection");
+                throw new ProcessException("Reading not supported by connection");
             }
 
-            FlowFile flowFile = session.create();
-            try {
-                PlcReadRequest.Builder builder = connection.readRequestBuilder();
-                Map<String,String> addressMap = getPlcAddressMap(context, flowFile);
-                final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
+            final Map<String,String> addressMap = getPlcAddressMap(context, incomingFlowFile);
+            final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
 
-                if (tags != null){
-                    for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
-                        builder.addTag(tag.getKey(), tag.getValue());
-                    }
-                } else {
-                    if (debugEnabled)
-                        logger.debug("PlcTypes resolution not found in cache and will be added with key: " + addressMap);
-                    for (Map.Entry<String,String> entry: addressMap.entrySet()){
-                        builder.addTagAddress(entry.getKey(), entry.getValue());
-                    }
-                }
 
-                PlcReadRequest readRequest = builder.build();
-                PlcReadResponse response = readRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
-                Map<String, String> attributes = new HashMap<>();
-                for (String tagName : response.getTagNames()) {
-                    for (int i = 0; i < response.getNumberOfValues(tagName); i++) {
-                        Object value = response.getObject(tagName, i);
-                        attributes.put(tagName, String.valueOf(value));
-                    }
-                }
-                flowFile = session.putAllAttributes(flowFile, attributes); 
-                
-                if (tags == null){
-                    if (debugEnabled)
-                        logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
-                    getSchemaCache().addSchema(
-                        addressMap, 
-                        readRequest.getTagNames(),
-                        readRequest.getTags(),
-                        null
-                    );
-                }
+            PlcReadRequest readRequest = getReadRequest(logger, addressMap, tags, connection);
 
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new ProcessException(e);
-            } catch (ExecutionException e) {
+            try {
+                final PlcReadResponse response = readRequest.execute().get(getTimeout(context, incomingFlowFile), TimeUnit.MILLISECONDS);
+                
+                evaluateReadResponse(session, flowFile, response);
+                
+            } catch (TimeoutException e) {
+                logger.error("Timeout reading the data from PLC", e);
+                getConnectionManager().removeCachedConnection(getConnectionString(context, incomingFlowFile));
                 throw new ProcessException(e);
+            } catch (Exception e) {
+                logger.error("Exception reading the data from PLC", e);
+                throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
+            }
+
+            
+            if (incomingFlowFile != null) {
+                session.remove(incomingFlowFile);
             }
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (ProcessException e) {
-            throw e;
+                
+            if (tags == null){
+                if (debugEnabled)
+                    logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
+                getSchemaCache().addSchema(
+                    addressMap, 
+                    readRequest.getTagNames(),
+                    readRequest.getTags(),
+                    null
+                );
+            }
+            
         } catch (Exception e) {
-            throw new ProcessException("Got an error while trying to get a connection", e);
+            session.remove(flowFile);
+            if (incomingFlowFile != null){
+                incomingFlowFile = session.putAttribute(incomingFlowFile, EXCEPTION, e.getLocalizedMessage());
+                session.transfer(incomingFlowFile, REL_FAILURE);
+            }
+            session.commitAsync();
+            throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
         }
     }
-
+    
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
index 6d2e5c246e..c2e53e7625 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessor.java
@@ -35,7 +35,6 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -70,8 +69,10 @@ public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor {
 	public static final String RESULT_ROW_COUNT = "plc4x.read.row.count";
 	public static final String RESULT_QUERY_EXECUTION_TIME = "plc4x.read.query.executiontime";
 	public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid";
+	public static final String EXCEPTION = "plc4x.read.exception";
 
-	public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("plc4x-record-writer").displayName("Record Writer")
+	public static final PropertyDescriptor PLC_RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
+		.name("plc4x-record-writer").displayName("Record Writer")
 		.description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
 				+ "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
 		.identifiesControllerService(RecordSetWriterFactory.class)
@@ -89,130 +90,114 @@ public class Plc4xSourceRecordProcessor extends BasePlc4xProcessor {
 		this.properties = Collections.unmodifiableList(pds);
 	}
 
-	@OnScheduled
-	@Override
-	public void onScheduled(final ProcessContext context) {
-		super.onScheduled(context);
-	}
 	
 	@Override
 	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+		
 		FlowFile fileToProcess = null;
-		// TODO: In the future the processor will be configurable to get the connection from incoming flowfile
 		if (context.hasIncomingConnection()) {
 			fileToProcess = session.get();
-			// If we have no FlowFile, and all incoming connections are self-loops then we
-			// can continue on.
-			// However, if we have no FlowFile and we have connections coming from other
-			// Processors, then we know that we should run only if we have a FlowFile.
+			
 			if (fileToProcess == null && context.hasNonLoopConnection()) {
 				return;
 			}
 		}
-		
-		Plc4xWriter plc4xWriter = new RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class), fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
+
 		final ComponentLog logger = getLogger();
+		
+		
 		// Get an instance of a component able to read from a PLC.
-		// TODO: Change this to use NiFi service instead of direct connection
 		final AtomicLong nrOfRows = new AtomicLong(0L);
 		final StopWatch executeTime = new StopWatch(true);
 
-		String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
-		Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
-		FlowFile resultSetFF;
+		final FlowFile resultSetFF;
 		if (fileToProcess == null) {
 			resultSetFF = session.create();
 		} else {
 			resultSetFF = session.create(fileToProcess);
-		}
-		if (inputFileAttrMap != null) {
-			resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap);
+			session.putAttribute(resultSetFF, INPUT_FLOWFILE_UUID, fileToProcess.getAttribute(CoreAttributes.UUID.key()));
 		}
 
-		try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString())) {
-			PlcReadRequest.Builder builder = connection.readRequestBuilder();
-			Map<String,String> addressMap = getPlcAddressMap(context, fileToProcess);
-			final RecordSchema recordSchema = getSchemaCache().retrieveSchema(addressMap);
-			final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
+		final FlowFile originalFlowFile = fileToProcess;
 
-			if (tags != null){
-				for (Map.Entry<String,PlcTag> tag : tags.entrySet()){
-					builder.addTag(tag.getKey(), tag.getValue());
-				}
-			} else {
-				if (debugEnabled)
-                    logger.debug("Plc-Avro schema and PlcTypes resolution not found in cache and will be added with key: " + addressMap);
-				for (Map.Entry<String,String> entry: addressMap.entrySet()){
-					builder.addTagAddress(entry.getKey(), entry.getValue());
-				}
-			}
-            
-			PlcReadRequest readRequest = builder.build();
-			final FlowFile originalFlowFile = fileToProcess;
-			resultSetFF = session.write(resultSetFF, out -> {
-				try {
-					PlcReadResponse readResponse = readRequest.execute().get(this.timeout, TimeUnit.MILLISECONDS);
+		Plc4xWriter plc4xWriter = new RecordPlc4xWriter(context.getProperty(PLC_RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class), 
+			fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes());
+
+
+		try {
+			session.write(resultSetFF, out -> {
+				final Map<String,String> addressMap = getPlcAddressMap(context, originalFlowFile);
+				final RecordSchema recordSchema = getSchemaCache().retrieveSchema(addressMap);
+				final Map<String, PlcTag> tags = getSchemaCache().retrieveTags(addressMap);
+				PlcReadRequest readRequest;
+				Long nrOfRowsHere;
+
+				try (PlcConnection connection = getConnectionManager().getConnection(getConnectionString(context, originalFlowFile))) {
 					
-					if(originalFlowFile == null) //there is no inherit attributes to use in writer service 
-						nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, recordSchema));
-					else 
-						nrOfRows.set(plc4xWriter.writePlcReadResponse(readResponse, out, logger, null, recordSchema, originalFlowFile));
-				} catch (InterruptedException e) {
-					logger.error("InterruptedException reading the data from PLC", e);
-		            Thread.currentThread().interrupt();
-		            throw new ProcessException(e);
+					readRequest =  getReadRequest(logger, addressMap, tags, connection);
+					
+					PlcReadResponse readResponse = readRequest.execute().get(getTimeout(context, originalFlowFile), TimeUnit.MILLISECONDS);
+							
+					nrOfRowsHere = evaluateReadResponse(context, logger, originalFlowFile, plc4xWriter, out, recordSchema, readResponse);
+
 				} catch (TimeoutException e) {
 					logger.error("Timeout reading the data from PLC", e);
+					getConnectionManager().removeCachedConnection(getConnectionString(context, originalFlowFile));
 					throw new ProcessException(e);
+				} catch (PlcConnectionException e) {
+					logger.error("Error getting the PLC connection", e);
+					throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e);
 				} catch (Exception e) {
 					logger.error("Exception reading the data from PLC", e);
 					throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
 				}
-			});
 
-			if (recordSchema == null){
-				if (debugEnabled)
-                    logger.debug("Adding Plc-Avro schema and PlcTypes resolution into cache with key: " + addressMap);
-				getSchemaCache().addSchema(
-					addressMap, 
-					readRequest.getTagNames(),
-					readRequest.getTags(),
-					plc4xWriter.getRecordSchema()
-				);
-			}
-			long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
-			final Map<String, String> attributesToAdd = new HashMap<>();
-			attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-			attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
-			if (inputFileUUID != null) {
-				attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
-			}
-			attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
-			resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
-			plc4xWriter.updateCounters(session);
-			logger.info("{} contains {} records; transferring to 'success'", new Object[] { resultSetFF, nrOfRows.get() });
-			// Report a FETCH event if there was an incoming flow file, or a RECEIVE event
-			// otherwise
-			if (context.hasIncomingConnection()) {
-				session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
-			} else {
-				session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
-			}
+				if (recordSchema == null){
+					if (debugEnabled)
+						logger.debug("Adding PlcTypes resolution into cache with key: " + addressMap);
+					getSchemaCache().addSchema(
+						addressMap, 
+						readRequest.getTagNames(),
+						readRequest.getTags(),
+						plc4xWriter.getRecordSchema()
+					);
+				}
+				nrOfRows.set(nrOfRowsHere);
+
+			});
 			
-			session.transfer(resultSetFF, BasePlc4xProcessor.REL_SUCCESS);
-			// Need to remove the original input file if it exists
+		} catch (Exception e) {
+			logger.error("Exception reading the data from the PLC", e);
 			if (fileToProcess != null) {
-				session.remove(fileToProcess);
-				fileToProcess = null;
+				session.putAttribute(fileToProcess, EXCEPTION, e.getLocalizedMessage());
+				session.transfer(fileToProcess, REL_FAILURE);
 			}
+			session.remove(resultSetFF);
 			session.commitAsync();
-			
-		} catch (PlcConnectionException e) {
-			logger.error("Error getting the PLC connection", e);
-			throw new ProcessException("Got an a PlcConnectionException while trying to get a connection", e);
-		} catch (Exception e) {
-			logger.error("Got an error while trying to get a connection", e);
-			throw new ProcessException("Got an error while trying to get a connection", e);
+			throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
+		}
+
+		plc4xWriter.updateCounters(session);
+		long executionTimeElapsed = executeTime.getElapsed(TimeUnit.MILLISECONDS);
+		final Map<String, String> attributesToAdd = new HashMap<>();
+		attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
+		attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
+		attributesToAdd.putAll(plc4xWriter.getAttributesToAdd());
+
+		session.putAllAttributes(resultSetFF, attributesToAdd);
+		
+		logger.info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());
+
+		if (context.hasIncomingConnection()) {
+			session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
+		} else {
+			session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed);
+		}
+		
+		if (fileToProcess != null) {
+			session.remove(fileToProcess);
 		}
+		session.transfer(resultSetFF, REL_SUCCESS);
 	}
+
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
index 434e924d4a..3794c0ba07 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessStrategy.java
@@ -16,11 +16,32 @@
  */
 package org.apache.plc4x.nifi.address;
 
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 
+import java.util.List;
 import java.util.Map;
 
 public interface AddressesAccessStrategy {
+    /**
+     * Returns the allowable value associated with the strategy.
+     * @return AllowableValue the allowable value associated
+     */
+    AllowableValue getAllowableValue();
+
+    /**
+     * Returns a list of property descriptors needed in for the strategy.
+     * @return List of PropertyDescriptor needed for the strategy
+     */
+    List<PropertyDescriptor> getPropertyDescriptors();
+    
+    /**
+     * Returns a map with the names and addresses of the tags.
+     * @param context the context of the processor
+     * @param flowFile the FlowFile being processed
+     * @return Map with the tag names and addresses
+     */
     Map<String, String> extractAddresses(final ProcessContext context, final FlowFile flowFile);
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
index bfeff23fdd..9b63d584b2 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/AddressesAccessUtils.java
@@ -22,45 +22,75 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
 
 public class AddressesAccessUtils {
-    public static final AllowableValue ADDRESS_PROPERTY = new AllowableValue(
-            "property-address",
-            "Use Properties as Addresses",
-            "Each property will be treated as tag-address pairs after Expression Language is evaluated.");
 
-    public static final AllowableValue ADDRESS_TEXT = new AllowableValue(
-            "text-address",
-            "Use 'Address Text' Property",
-            "Addresses will be obtained from 'Address Text' Property. It's content must be a valid JSON " +
-                    "after Expression Language is evaluated. ");
+	private static DefaultPlcDriverManager manager = new DefaultPlcDriverManager();
 
-    public static final PropertyDescriptor PLC_ADDRESS_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
-            .name("plc4x-address-access-strategy")
-            .displayName("Address Access Strategy")
-            .description("Strategy used to obtain the PLC addresses")
-            .allowableValues(ADDRESS_PROPERTY, ADDRESS_TEXT)
-            .defaultValue(ADDRESS_PROPERTY.getValue())
-            .required(true)
-            .build();
+    public static DefaultPlcDriverManager getManager() {
+        return manager;
+    }
+	
+	public static final AllowableValue ADDRESS_PROPERTY = new AllowableValue(
+		"property-address",
+		"Use Properties as Addresses",
+		"Each property will be treated as tag-address pairs after Expression Language is evaluated.");
 
-    public static final PropertyDescriptor ADDRESS_TEXT_PROPERTY = new PropertyDescriptor.Builder()
-            .name("text-address-property")
-            .displayName("Address Text")
-            .description("Must contain a valid JSON object after Expression Language is evaluated. "
-                    + "Each field-value is treated as tag-address.")
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .addValidator(new JsonValidator())
-            .dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_TEXT)
-            .required(true)
-            .build();
+	public static final AllowableValue ADDRESS_TEXT = new AllowableValue(
+		"text-address",
+		"Use 'Address Text' Property",
+		"Addresses will be obtained from 'Address Text' Property. It's content must be a valid JSON " +
+			"after Expression Language is evaluated. ");
 
-    public static AddressesAccessStrategy getAccessStrategy(final ProcessContext context) {
-        String value = context.getProperty(PLC_ADDRESS_ACCESS_STRATEGY).getValue();
-        if (ADDRESS_PROPERTY.getValue().equalsIgnoreCase(value))
-            return new DynamicPropertyAccessStrategy();
-        else if (ADDRESS_TEXT.getValue().equalsIgnoreCase(value))
-            return new TextPropertyAccessStrategy();
-        return null;
-    }
+	public static final AllowableValue ADDRESS_FILE = new AllowableValue(
+		"file-address",
+		"Use 'Address File' Property",
+		"Addresses will be obtained from the file in 'Address File' Property. It's content must be a valid JSON " +
+			"after Expression Language is evaluated. ");
+
+	public static final PropertyDescriptor PLC_ADDRESS_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
+		.name("plc4x-address-access-strategy")
+		.displayName("Address Access Strategy")
+		.description("Strategy used to obtain the PLC addresses")
+		.allowableValues(ADDRESS_PROPERTY, ADDRESS_TEXT, ADDRESS_FILE)
+		.defaultValue(ADDRESS_PROPERTY.getValue())
+		.required(true)
+		.build();
+
+	public static final PropertyDescriptor ADDRESS_TEXT_PROPERTY = new PropertyDescriptor.Builder()
+		.name("text-address-property")
+		.displayName("Address Text")
+		.description("Must contain a valid JSON object after Expression Language is evaluated. "
+			+ "Each field-value is treated as tag-address.")
+		.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+		.addValidator(new JsonValidator())
+		.addValidator(new TextPropertyAccessStrategy.TagValidator(manager))
+		.dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_TEXT)
+		.required(true)
+		.build();
+
+	public static final PropertyDescriptor ADDRESS_FILE_PROPERTY = new PropertyDescriptor.Builder()
+		.name("file-address-property")
+		.displayName("Address File")
+		.description("Must contain a valid path after Expression Language is evaluated. "
+			+ "The file content must be a valid JSON and each field-value is treated as tag-address.")
+		.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+		.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+		.addValidator(new FilePropertyAccessStrategy.TagValidator(manager))
+		.dependsOn(PLC_ADDRESS_ACCESS_STRATEGY, ADDRESS_FILE)
+		.required(true)
+		.build();
+
+	public static AddressesAccessStrategy getAccessStrategy(final ProcessContext context) {
+		String value = context.getProperty(PLC_ADDRESS_ACCESS_STRATEGY).getValue();
+		if (ADDRESS_PROPERTY.getValue().equalsIgnoreCase(value))
+			return new DynamicPropertyAccessStrategy();
+		else if (ADDRESS_TEXT.getValue().equalsIgnoreCase(value))
+			return new TextPropertyAccessStrategy();
+		else if (ADDRESS_FILE.getValue().equalsIgnoreCase(value))
+			return new FilePropertyAccessStrategy();
+		return null;
+	}
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/BaseAccessStrategy.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/BaseAccessStrategy.java
new file mode 100644
index 0000000000..c94d7d3141
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/BaseAccessStrategy.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcDriver;
+import org.apache.plc4x.nifi.BasePlc4xProcessor;
+
+
+public abstract class BaseAccessStrategy implements AddressesAccessStrategy{
+    private boolean isInitializated = false;
+    private boolean isDynamic;
+    protected Map<String,String> cachedAddresses = null;
+
+    protected AllowableValue allowableValue;
+    protected List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
+
+    protected Map<String, String> getCachedAddresses() {
+        return cachedAddresses;
+    }
+
+    public Map<String,String> extractAddressesFromResources(final ProcessContext context, final FlowFile flowFile) {
+        throw new UnsupportedOperationException("Method 'extractAddressesFromResources' not implemented");
+    }
+
+
+    @Override
+    public Map<String, String> extractAddresses(final ProcessContext context, final FlowFile flowFile) {
+        if (!isInitializated) {
+            getPropertyDescriptors().forEach(prop -> {
+                if (context.isExpressionLanguagePresent(prop)){
+                    isDynamic = true;
+                }
+            });
+            isInitializated = true;
+        }
+
+        Map<String, String> result = getCachedAddresses();
+        if (result == null) {
+            result = extractAddressesFromResources(context, flowFile);
+            if (!isDynamic) {
+                cachedAddresses = result;
+            }
+        }
+        return result;
+    }
+
+    public static class TagValidator implements Validator {
+        
+        private DefaultPlcDriverManager manager;
+
+        public TagValidator(DefaultPlcDriverManager manager) {
+            this.manager = manager;
+        }
+
+        protected void checkTags(PlcDriver driver, Collection<String> tags) {
+            for (String tag : tags) {
+                driver.prepareTag(tag);
+            }
+        }
+
+        protected Collection<String> getTags(String input) throws Exception {
+            throw new UnsupportedOperationException("Method 'getTags' not implemented");
+        } 
+
+        @Override
+        public ValidationResult validate(String subject, String input, ValidationContext context) {
+            String connectionString = context.getProperty(BasePlc4xProcessor.PLC_CONNECTION_STRING).getValue();
+
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input) || 
+                context.isExpressionLanguagePresent(connectionString)) {
+                return new ValidationResult.Builder().subject(subject).input(input)
+                    .explanation("Expression Language Present").valid(true).build();
+            }
+
+            try {
+                PlcDriver driver = manager.getDriverForUrl(connectionString);
+
+                if (!context.isExpressionLanguagePresent(input)) {
+                    checkTags(driver, getTags(input));
+                } 
+                
+            }catch (Exception e) {
+                    return new ValidationResult.Builder().subject(subject)
+                        .explanation(e.getLocalizedMessage())
+                        .valid(false)
+                        .build();
+            }
+            
+            return new ValidationResult.Builder().subject(subject)
+                .explanation("")
+                .valid(true)
+                .build();
+        }
+    }
+    
+}
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
index fe7c3c5db2..879766f6de 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/DynamicPropertyAccessStrategy.java
@@ -17,14 +17,34 @@
 
 package org.apache.plc4x.nifi.address;
 
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
 
-public class DynamicPropertyAccessStrategy implements AddressesAccessStrategy{
+
+public class DynamicPropertyAccessStrategy extends BaseAccessStrategy{
+
+    @Override
+    public AllowableValue getAllowableValue() {
+        return AddressesAccessUtils.ADDRESS_PROPERTY;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return List.of();
+    }
+
+    @Override
+    public Map<String,String> extractAddressesFromResources(final ProcessContext context, final FlowFile flowFile) {
+        return extractAddressesFromAttributes(context, flowFile);
+    }
 
     private Map<String,String> extractAddressesFromAttributes(final ProcessContext context, final FlowFile flowFile) {
         Map<String,String> addressMap = new HashMap<>();
@@ -35,7 +55,16 @@ public class DynamicPropertyAccessStrategy implements AddressesAccessStrategy{
         return addressMap; 
     }
 
-    public Map<String, String> extractAddresses(final ProcessContext context, final FlowFile flowFile) {
-        return extractAddressesFromAttributes(context, flowFile);
+
+    public static class TagValidator extends BaseAccessStrategy.TagValidator {
+        public TagValidator(DefaultPlcDriverManager manager) {
+            super(manager);
+        }
+
+        @Override
+        protected Collection<String> getTags(String input) throws Exception {
+            return List.of(input);
+        }
     }
+
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/FilePropertyAccessStrategy.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/FilePropertyAccessStrategy.java
new file mode 100644
index 0000000000..b3214375b8
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/FilePropertyAccessStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.nifi.address;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+
+import java.nio.file.StandardOpenOption;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class FilePropertyAccessStrategy extends BaseAccessStrategy {
+
+    @Override
+    public AllowableValue getAllowableValue() {
+        return AddressesAccessUtils.ADDRESS_FILE;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return List.of(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+    }
+
+    @Override
+    public Map<String, String> extractAddressesFromResources(final ProcessContext context, final FlowFile flowFile) throws ProcessException{
+        try {
+            return extractAddressesFromFile(context.getProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY).evaluateAttributeExpressions(flowFile).getValue());
+        } catch (Exception e) {
+            throw new ProcessException(e.toString());
+        }
+    }
+
+    public static Map<String,String> extractAddressesFromFile(String fileName) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+
+        Path filePath = Path.of(fileName);
+        InputStream input = Files.newInputStream(filePath, StandardOpenOption.READ);
+        
+        return mapper.readerForMapOf(String.class).readValue(input);
+    }
+
+    public static class TagValidator extends BaseAccessStrategy.TagValidator {
+        public TagValidator(DefaultPlcDriverManager manager) {
+            super(manager);
+        }
+
+        @Override
+        protected Collection<String> getTags(String input) throws Exception {
+            return FilePropertyAccessStrategy.extractAddressesFromFile(input).values();
+        } 
+    }
+}
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
index 7f5c955401..78149f3614 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/address/TextPropertyAccessStrategy.java
@@ -17,30 +17,54 @@
 
 package org.apache.plc4x.nifi.address;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.ProcessException;
-
+import org.apache.plc4x.java.DefaultPlcDriverManager;
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-public class TextPropertyAccessStrategy implements AddressesAccessStrategy{
-    private Map<String,String> extractAddressesFromText(String input) throws JsonProcessingException {
-        ObjectMapper mapper = new ObjectMapper();
+public class TextPropertyAccessStrategy extends BaseAccessStrategy {
 
-        return mapper.readValue(input, Map.class);
+    @Override
+    public AllowableValue getAllowableValue() {
+        return AddressesAccessUtils.ADDRESS_TEXT;
     }
 
     @Override
-    public Map<String, String> extractAddresses(final ProcessContext context, final FlowFile flowFile) throws ProcessException{
+    public List<PropertyDescriptor> getPropertyDescriptors() {
+        return List.of(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+    }
+
+    @Override
+    public Map<String, String> extractAddressesFromResources(final ProcessContext context, final FlowFile flowFile) throws ProcessException{
         try {
             return extractAddressesFromText(context.getProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY).evaluateAttributeExpressions(flowFile).getValue());
         } catch (Exception e) {
             throw new ProcessException(e.toString());
         }
-        
+    }
+
+    private static Map<String,String> extractAddressesFromText(String input) throws JsonProcessingException {
+        ObjectMapper mapper = new ObjectMapper();
+
+        return mapper.readerForMapOf(String.class).readValue(input);
+    }
+
+    public static class TagValidator extends BaseAccessStrategy.TagValidator {
+        public TagValidator(DefaultPlcDriverManager manager) {
+            super(manager);
+        }
+
+        @Override
+        protected Collection<String> getTags(String input) throws Exception {
+            return TextPropertyAccessStrategy.extractAddressesFromText(input).values();
+        } 
     }
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
index 69157594ec..f546159e48 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.nifi.record;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -46,12 +47,18 @@ public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
     private Set<String> rsColumnNames;
     private boolean moreRows;
     private final boolean debugEnabled = logger.isDebugEnabled();
+    private final String timestampFieldName; 
     private boolean isSubscription = false;
+    private Instant timestamp;
 
    	private final AtomicReference<RecordSchema> recordSchema = new AtomicReference<>(null);
 
-    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, RecordSchema recordSchema) throws IOException {
+    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse, RecordSchema recordSchema, String timestampFieldName) {
+        this.timestampFieldName = timestampFieldName;
         this.readResponse = readResponse;
+        if (!isSubscription) {
+            timestamp = Instant.now();
+        }
         moreRows = true;
         
         isSubscription = readResponse.getRequest() == null;
@@ -68,7 +75,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
         rsColumnNames = responseDataStructure.keySet();
                
         if (recordSchema == null) {
-        	Schema avroSchema = Plc4xCommon.createSchema(responseDataStructure);     	
+        	Schema avroSchema = Plc4xCommon.createSchema(responseDataStructure, this.timestampFieldName);     	
         	this.recordSchema.set(AvroTypeUtil.createSchema(avroSchema));
         } else {
             this.recordSchema.set(recordSchema);
@@ -78,7 +85,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
 
     }
 
-    public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final DefaultPlcSubscriptionEvent subscriptionEvent) throws IOException {;
+    public Map<String, PlcValue> plc4xSubscriptionResponseRecordSet(final DefaultPlcSubscriptionEvent subscriptionEvent) {
         moreRows = true;
         
         if (debugEnabled)
@@ -131,7 +138,7 @@ public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
         //do nothing
     }
 
-    protected Record createRecord(final PlcReadResponse readResponse) throws IOException{
+    protected Record createRecord(final PlcReadResponse readResponse) {
         final Map<String, Object> values = new HashMap<>(getSchema().getFieldCount());
 
         if (debugEnabled)
@@ -158,7 +165,12 @@ public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
         }
 
         //add timestamp tag to schema
-        values.put(Plc4xCommon.PLC4X_RECORD_TIMESTAMP_FIELD_NAME, System.currentTimeMillis());
+        if (isSubscription) {
+            values.put(timestampFieldName, ((DefaultPlcSubscriptionEvent) readResponse).getTimestamp().toEpochMilli());
+        } else {
+            values.put(timestampFieldName, timestamp.toEpochMilli());
+        }
+        
         if (debugEnabled)
             logger.debug("added timestamp tag to record.");
 
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
index a950ac3735..1d774e79ef 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xWriter.java
@@ -39,8 +39,8 @@ public interface Plc4xWriter {
      * @return the number of rows written to the output stream
      * @throws Exception if any errors occur during the writing of the result set to the output stream
      */
-    long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, RecordSchema recordSchema) throws Exception;
-    long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception;
+    long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, RecordSchema recordSchema, String timestampFieldName) throws Exception;
+    long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger,  Plc4xReadResponseRowCallback callback, RecordSchema recordSchema, FlowFile originalFlowFile, String timestampFieldName) throws Exception;
 
     /**
      * Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map.
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
index ca8f0267cd..f1e314b5da 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/RecordPlc4xWriter.java
@@ -55,9 +55,11 @@ public class RecordPlc4xWriter implements Plc4xWriter {
     }
 
     @Override
-    public long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback, RecordSchema recordSchema) throws Exception {
-        if (fullRecordSet == null) {
-            fullRecordSet = new Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
+    public long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger, 
+                Plc4xReadResponseRowCallback callback, RecordSchema recordSchema, String timestampFieldName) throws Exception {
+        
+                    if (fullRecordSet == null) {
+            fullRecordSet = new Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema, timestampFieldName);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
         }
         Map<String, String> empty = new HashMap<>();
@@ -73,9 +75,11 @@ public class RecordPlc4xWriter implements Plc4xWriter {
     }
 
     @Override
-    public long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger, Plc4xReadResponseRowCallback callback, RecordSchema recordSchema, FlowFile originalFlowFile) throws Exception {
-        if (fullRecordSet == null) {
-            fullRecordSet = new Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema);
+    public long writePlcReadResponse(PlcReadResponse response, OutputStream outputStream, ComponentLog logger, 
+            Plc4xReadResponseRowCallback callback, RecordSchema recordSchema, FlowFile originalFlowFile, String timestampFieldName) throws Exception {
+        
+                if (fullRecordSet == null) {
+            fullRecordSet = new Plc4xReadResponseRecordSetWithCallback(response, callback, recordSchema, timestampFieldName);
             writeSchema = recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
         }
 
@@ -158,8 +162,10 @@ public class RecordPlc4xWriter implements Plc4xWriter {
     private static class Plc4xReadResponseRecordSetWithCallback extends Plc4xReadResponseRecordSet {
         private final Plc4xReadResponseRowCallback callback;
 
-        public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse readResponse, Plc4xReadResponseRowCallback callback, RecordSchema recordSchema) throws IOException {
-            super(readResponse, recordSchema);
+        public Plc4xReadResponseRecordSetWithCallback(final PlcReadResponse readResponse, Plc4xReadResponseRowCallback callback, 
+                RecordSchema recordSchema, String timestampFieldName) {
+
+            super(readResponse, recordSchema, timestampFieldName);
             this.callback = callback;
         }
 
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
index 801bbd90fb..df80ffd6d6 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/SchemaCache.java
@@ -19,9 +19,9 @@
 package org.apache.plc4x.nifi.record;
 
 import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -60,7 +60,7 @@ public class SchemaCache {
      * @param tagsList list of PlcTag's
      * @param schema record schema used for PlcResponse serialization. Can be null
      */
-    public void addSchema(final Map<String,String> schemaIdentifier, final LinkedHashSet<String> tagsNames, final List<? extends PlcTag> tagsList,  final RecordSchema schema) {        
+    public void addSchema(final Map<String,String> schemaIdentifier, final Set<String> tagsNames, final List<? extends PlcTag> tagsList,  final RecordSchema schema) {        
         if (!schemaMap.containsKey(schemaIdentifier.toString())){
             if (nextSchemaPosition.get() == cacheSize.get()){
                 nextSchemaPosition.set(0);
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
index a46cea7ee5..b46f9f72cc 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/subscription/Plc4xListenerDispatcher.java
@@ -45,7 +45,6 @@ public class Plc4xListenerDispatcher implements Runnable {
     private ComponentLog logger;
     private boolean running = false;
     private BlockingQueue<PlcSubscriptionEvent> events;
-    private PlcSubscriptionResponse subscriptionResponse;
     private PlcConnection connection;
     private Long timeout;
     private BlockingQueue<PlcSubscriptionEvent> queuedEvents;
@@ -95,7 +94,7 @@ public class Plc4xListenerDispatcher implements Runnable {
             }
         }
         PlcSubscriptionRequest subscriptionRequest = builder.build();
-
+        PlcSubscriptionResponse subscriptionResponse;
         try {
             subscriptionResponse = subscriptionRequest.execute().get(timeout, TimeUnit.MILLISECONDS);
             
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
index a0fd6c9258..00075f6cfc 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/util/Plc4xCommon.java
@@ -53,9 +53,6 @@ import org.apache.plc4x.java.spi.values.PlcWORD;
 
 public class Plc4xCommon {
 
-	
-	public static final String PLC4X_RECORD_TIMESTAMP_FIELD_NAME = "ts";
-	
 	/**
 	 * This method is used to infer output AVRO schema directly from the PlcReadResponse object. 
 	 * It is directly used from the RecordPlc4xWriter.writePlcReadResponse() method.
@@ -66,7 +63,7 @@ public class Plc4xCommon {
 	 * @param responseDataStructure: a map that reflects the structure of the answer given by the PLC when making a Read Request.
 	 * @return AVRO Schema built from responseDataStructure.
 	 */
-	public static Schema createSchema(Map<String, ? extends PlcValue> responseDataStructure){
+	public static Schema createSchema(Map<String, ? extends PlcValue> responseDataStructure, String timestampFieldName){
 		//plc and record datatype map
 		final FieldAssembler<Schema> builder = SchemaBuilder.record("PlcReadResponse").namespace("any.data").fields();	
 		String fieldName = null;
@@ -108,7 +105,7 @@ public class Plc4xCommon {
 		}
 		
 		//add timestamp tag to schema
-		builder.name(PLC4X_RECORD_TIMESTAMP_FIELD_NAME).type().longType().noDefault();
+		builder.name(timestampFieldName).type().longType().noDefault();
 		
 		
 		return builder.endRecord();
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
index 90d128a9fb..61601724a4 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkProcessorTest.java
@@ -19,12 +19,16 @@ package org.apache.plc4x.nifi;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
 import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -41,7 +45,7 @@ public class Plc4xSinkProcessorTest {
         testRunner.setValidateExpressionUsage(false);
 
         testRunner.setProperty(Plc4xSinkProcessor.PLC_CONNECTION_STRING, "simulated://127.0.0.1");
-        testRunner.setProperty(Plc4xSinkRecordProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS, "1000");
+        testRunner.setProperty(Plc4xSinkProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS, "1000");
 
         testRunner.addConnection(Plc4xSinkProcessor.REL_SUCCESS);
         testRunner.addConnection(Plc4xSinkProcessor.REL_FAILURE);
@@ -73,4 +77,17 @@ public class Plc4xSinkProcessorTest {
         testProcessor();
     }
 
+    // Test addressess file property access strategy
+    @Test
+    public void testWithAdderessFile() throws InitializationException {
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, "file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+            testProcessor();
+        }
+    }
+
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
index 627ab2e80b..9f22b62bc8 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSinkRecordProcessorTest.java
@@ -25,9 +25,12 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
 import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -91,4 +94,17 @@ public class Plc4xSinkRecordProcessorTest {
 		testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString());
 		testAvroRecordReaderProcessor();
 	}
+
+	// Test addressess file property access strategy
+    @Test
+    public void testWithAdderessFile() throws InitializationException {
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, "file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+            testAvroRecordReaderProcessor();
+        }
+    }
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
index 331ab51eb1..a5e79ceb03 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceProcessorTest.java
@@ -21,9 +21,12 @@ package org.apache.plc4x.nifi;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
 import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,10 +40,11 @@ public class Plc4xSourceProcessorTest {
     public void init() {
         testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
         testRunner.setIncomingConnection(false);
-        testRunner.setValidateExpressionUsage(false);
+        testRunner.setValidateExpressionUsage(true);
 
-        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, "simulated://127.0.0.1");
-        testRunner.setProperty(Plc4xSinkRecordProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS, "1000");
+        testRunner.setVariable("url", "simulated://127.0.0.1");
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, "${url}");
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_FUTURE_TIMEOUT_MILISECONDS, "1000");
 
         testRunner.addConnection(Plc4xSourceProcessor.REL_SUCCESS);
         testRunner.addConnection(Plc4xSourceProcessor.REL_FAILURE);
@@ -69,4 +73,16 @@ public class Plc4xSourceProcessorTest {
         testProcessor();
     }
 
+    // Test addressess file property access strategy
+    @Test
+    public void testWithAdderessFile() {
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, "file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+            testProcessor();
+        }
+    }
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
index 4e7c43102d..5d47d0612c 100644
--- a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/Plc4xSourceRecordProcessorTest.java
@@ -23,9 +23,12 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.plc4x.nifi.address.AddressesAccessUtils;
+import org.apache.plc4x.nifi.address.FilePropertyAccessStrategy;
 import org.apache.plc4x.nifi.util.Plc4xCommonTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,7 +64,7 @@ public class Plc4xSourceRecordProcessorTest {
     	testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_FAILURE, 0);
     	testRunner.assertTransferCount(Plc4xSourceRecordProcessor.REL_SUCCESS, NUMBER_OF_CALLS);
 
-		Plc4xCommonTest.assertAvroContent(testRunner.getFlowFilesForRelationship(Plc4xSourceProcessor.REL_SUCCESS), false, true);
+		Plc4xCommonTest.assertAvroContent(testRunner.getFlowFilesForRelationship(Plc4xSourceRecordProcessor.REL_SUCCESS), false, true);
     }
 
 	// Test dynamic properties addressess access strategy
@@ -79,4 +82,17 @@ public class Plc4xSourceRecordProcessorTest {
         testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString());
         testAvroRecordWriterProcessor();
     }
+
+    // Test addressess file property access strategy
+    @Test
+    public void testWithAdderessFile() throws InitializationException {
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, "file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+            testAvroRecordWriterProcessor();
+        }
+    }
 }
diff --git a/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/address/AccessStrategyTest.java b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/address/AccessStrategyTest.java
new file mode 100644
index 0000000000..c244991c61
--- /dev/null
+++ b/plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/test/java/org/apache/plc4x/nifi/address/AccessStrategyTest.java
@@ -0,0 +1,190 @@
+package org.apache.plc4x.nifi.address;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.plc4x.nifi.Plc4xSourceProcessor;
+import org.apache.plc4x.nifi.util.Plc4xCommonTest;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class AccessStrategyTest {
+
+    @Mock
+    FilePropertyAccessStrategy testFileObject = new FilePropertyAccessStrategy();
+
+    private TestRunner testRunner; 
+
+    // Tests that addresses in dynamic properties are read correctly and addresses are cached if no EL is used
+    @Test
+    public void testDynamicPropertyAccessStrategy() {
+
+        DynamicPropertyAccessStrategy testObject = new DynamicPropertyAccessStrategy();
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        assert testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_PROPERTY);
+        assert testObject.getPropertyDescriptors().isEmpty();
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> testRunner.setProperty(k, v));
+		
+        FlowFile flowFile = testRunner.enqueue("");
+        
+        Map<String, String> values = testObject.extractAddresses(testRunner.getProcessContext(), flowFile);
+
+        assertTrue(testObject.getCachedAddresses().equals(values));
+        assertTrue(testObject.getCachedAddresses().equals(Plc4xCommonTest.getAddressMap()));
+    }
+
+    // Tests incorrect address detection on dynamic properties
+    @Test
+    public void testDynamicPropertyAccessStrategyIncorrect() {
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> testRunner.setProperty(k, "no an correct address"));
+
+        testRunner.assertNotValid();
+    }
+
+    // Tests that if EL is present in dynamic properties the processor is valid
+    @Test
+    public void testDynamicPropertyAccessStrategyELPresent() {
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, "simulated://127.0.0.1");
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> testRunner.setProperty(k, "${attribute}"));
+
+        testRunner.assertValid();
+    }
+
+    // Tests that addresses in text property are read correctly and addresses are cached if no EL is used
+    @Test
+    public void testTextPropertyAccessStrategy() throws JsonProcessingException {
+
+        TextPropertyAccessStrategy testObject = new TextPropertyAccessStrategy();
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        assert testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_TEXT);
+        assert testObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+        
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY, new ObjectMapper().writeValueAsString(Plc4xCommonTest.getAddressMap()).toString());
+		
+        FlowFile flowFile = testRunner.enqueue("");
+        
+        Map<String, String> values = testObject.extractAddresses(testRunner.getProcessContext(), flowFile);
+
+        assertTrue(testObject.getCachedAddresses().equals(values));
+        assertTrue(testObject.getCachedAddresses().equals(Plc4xCommonTest.getAddressMap()));
+    }
+
+    
+
+    // Tests incorrect address detection on text property
+    @Test
+    public void testTextPropertyAccessStrategyIncorrect() {
+
+        TextPropertyAccessStrategy testObject = new TextPropertyAccessStrategy();
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        assert testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_TEXT);
+        assert testObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY.getName(), "no an correct address"));
+
+        testRunner.assertNotValid();
+
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY.getName(), "{\"neither\":\"this one\"}"));
+
+        testRunner.assertNotValid();
+    }
+
+    // Tests that if EL is present in text property the processor is valid 
+    @Test
+    public void testTextPropertyAccessStrategyELPresent() {
+
+        TextPropertyAccessStrategy testObject = new TextPropertyAccessStrategy();
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, "simulated://127.0.0.1");
+        
+        assert testObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_TEXT);
+        assert testObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY);
+        
+        Plc4xCommonTest.getAddressMap().forEach((k,v) -> testRunner.setProperty(AddressesAccessUtils.ADDRESS_TEXT_PROPERTY.getName(), "${attribute}"));
+
+        testRunner.assertValid();
+    }
+
+    // Tests that addresses in file are read correctly and addresses are cached if no EL is used
+    @Test
+    public void testFilePropertyAccessStrategy() throws IOException {
+
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+        assert testFileObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_FILE);
+        assert testFileObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+
+
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, "file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Plc4xCommonTest.getAddressMap());
+
+
+            FlowFile flowFile = testRunner.enqueue("");
+            Map<String, String> values = testFileObject.extractAddresses(testRunner.getProcessContext(), flowFile);
+
+            assertTrue(testFileObject.getCachedAddresses().equals(values));
+            assertTrue(testFileObject.getCachedAddresses().equals(Plc4xCommonTest.getAddressMap()));
+        }
+    }
+
+    // Tests incorrect address detection on file
+    @Test
+    public void testFilePropertyAccessStrategyIncorrect() throws IOException {
+
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+        
+        assert testFileObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_FILE);
+        assert testFileObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+        
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, "file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Map.of("not", "a correct address"));
+
+            testRunner.assertNotValid();
+        }
+    }
+
+    // Tests that if EL is present in file the processor is valid 
+    @Test
+    public void testFilePropertyAccessStrategyELPresent() throws IOException {
+
+        testRunner = TestRunners.newTestRunner(Plc4xSourceProcessor.class);
+
+        testRunner.setProperty(Plc4xSourceProcessor.PLC_CONNECTION_STRING, "simulated://127.0.0.1");
+        
+        assert testFileObject.getAllowableValue().equals(AddressesAccessUtils.ADDRESS_FILE);
+        assert testFileObject.getPropertyDescriptors().contains(AddressesAccessUtils.ADDRESS_FILE_PROPERTY);
+        
+        testRunner.setProperty(AddressesAccessUtils.ADDRESS_FILE_PROPERTY, "file");
+
+        try (MockedStatic<FilePropertyAccessStrategy> staticMock = Mockito.mockStatic(FilePropertyAccessStrategy.class)) {
+            staticMock.when(() -> FilePropertyAccessStrategy.extractAddressesFromFile("file"))
+                .thenReturn(Map.of("EL in use", "${attribute}"));
+
+            testRunner.assertValid();
+        }
+    }
+}