You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@plc4x.apache.org by GitBox <gi...@apache.org> on 2022/08/02 14:10:38 UTC

[GitHub] [plc4x] ottobackwards commented on a diff in pull request #439: feature/nifi-record-2

ottobackwards commented on code in PR #439:
URL: https://github.com/apache/plc4x/pull/439#discussion_r935636816


##########
plc4j/integrations/apache-nifi/nifi-plc4x-nar/pom.xml:
##########
@@ -18,146 +18,188 @@
   under the License.
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+	<modelVersion>4.0.0</modelVersion>
 
-  <parent>
-    <groupId>org.apache.plc4x</groupId>
-    <artifactId>plc4j-apache-nifi</artifactId>
-    <version>0.10.0-SNAPSHOT</version>
-  </parent>
+	<parent>
+		<groupId>org.apache.plc4x</groupId>
+		<artifactId>plc4j-apache-nifi</artifactId>
+		<version>0.10.0-SNAPSHOT</version>
+	</parent>
 
-  <artifactId>plc4j-nifi-plc4x-nar</artifactId>
-  <packaging>nar</packaging>
+	<artifactId>plc4j-nifi-plc4x-nar</artifactId>
+	<packaging>nar</packaging>
 
-  <name>PLC4J: Integrations: Apache Nifi: NAR</name>
+	<name>PLC4J: Integrations: Apache Nifi: NAR</name>
 
-  <properties>
-    <maven.javadoc.skip>true</maven.javadoc.skip>
-    <source.skip>true</source.skip>
-  </properties>
+	<properties>
+		<maven.javadoc.skip>true</maven.javadoc.skip>
+		<source.skip>true</source.skip>
+	</properties>
 
-  <build>
-    <pluginManagement>
-      <plugins>
-        <plugin>
-          <groupId>org.owasp</groupId>
-          <artifactId>dependency-check-maven</artifactId>
-          <configuration>
-            <suppressionFiles>${project.basedir}/false-positives.xml</suppressionFiles>
-          </configuration>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-dependency-plugin</artifactId>
-          <configuration>
-            <usedDependencies>
-              <usedDependency>org.apache.plc4x:plc4j-nifi-plc4x-processors</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-api</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-ab-eth</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-ads</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-canopen</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-eip</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-knxnetip</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-modbus</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-opcua</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-s7</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-driver-simulated</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-pcap-replay</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-raw-socket</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-serial</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-tcp</usedDependency>
-              <usedDependency>org.apache.plc4x:plc4j-transport-udp</usedDependency>
-            </usedDependencies>
-          </configuration>
-        </plugin>
-      </plugins>
-    </pluginManagement>
-  </build>
+	<build>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<groupId>org.owasp</groupId>
+					<artifactId>dependency-check-maven</artifactId>
+					<configuration>
+						<suppressionFiles>${project.basedir}/false-positives.xml</suppressionFiles>
+					</configuration>
+				</plugin>
+				<plugin>
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-dependency-plugin</artifactId>
+					<configuration>
+						<usedDependencies>
+							<usedDependency>org.apache.plc4x:plc4j-nifi-plc4x-processors</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-api</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-ab-eth</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-ads</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-canopen</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-eip</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-knxnetip</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-modbus</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-opcua</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-s7</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-driver-simulated</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-pcap-replay</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-raw-socket</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-serial</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-tcp</usedDependency>
+							<usedDependency>org.apache.plc4x:plc4j-transport-udp</usedDependency>
+						</usedDependencies>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
 
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-nifi-plc4x-processors</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <!-- PLC4X -->
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-api</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <!-- Bundle Drivers -->
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-ab-eth</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-ads</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-canopen</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-eip</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-knxnetip</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-modbus</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-opcua</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-s7</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-driver-simulated</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-nifi-plc4x-processors</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
 
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-pcap-replay</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-raw-socket</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-serial</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-tcp</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4j-transport-udp</artifactId>
-      <version>0.10.0-SNAPSHOT</version>
-    </dependency>
-  </dependencies>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-standard-services-api-nar</artifactId>
+			<version>${nifi.version}</version>
+			<type>nar</type>
+			<scope>provided</scope>
+		</dependency>
 
-</project>
+		<!-- PLC4X -->
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-api</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<!-- Bundle Drivers -->
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-ab-eth</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-ads</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-canopen</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-eip</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-knxnetip</artifactId>
+			<version>${nifi-plc4x.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.plc4x</groupId>
+			<artifactId>plc4j-driver-modbus</artifactId>
+			<version>${nifi-plc4x.version}</version>

Review Comment:
   remove commented entries



##########
plc4j/integrations/apache-nifi/README.md:
##########
@@ -0,0 +1,90 @@
+# PLC4X Apache NiFi Integration
+
+## Plc4xSinkProcessor
+
+## Plc4xSourceProcessor
+
+## Plc4xSourceRecordProcessor
+
+This processor is <ins>record oriented</ins>, formatting output flowfile content using a Record Writer (for further information see [NiFi Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)). An example of operation is included on the following path:
+*./test-nifi-template/NIFI-PLC4XIntegration-record-example-1.12.xml*. This file is a Nifi Template that could be directly imported from the NiFi UI to test the operation.
+
+The Plc4xSourceRecord Processor can be configured using the following **properties**:
+
+- *PLC connection String:* PLC4X connection string used to connect to a given PLC device.
+- *PLC resource address String:* PLC4X address string used identify the resource to read/write on a given PLC device (Multiple values supported). The expected  format is: {name}={address}(;{name}={address}*)
+- *Record Writer:* Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.
+- *Force Reconnect every request:* Specifies if the connection to the PLC will be recreated on every trigger event
+
+An *example* of these properties for reading values from a S7-1200:
+
+- *PLC connection String:* *s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200*
+- *PLC resource address String:* *var1=%DB1:DBX0.0:BOOL;var2=%DB1:DBX0.1:BOOL;var3=%DB1:DBB01:BYTE;var4=%DB1:DBW02:WORD;var5=%DB1:DBW04:INT*
+- *Record Writer:* *PLC4x Embedded - AvroRecordSetWriter*
+- *Force Reconnect every request:* *false*
+
+For the **Record Writer** property, any writer included in NiFi could be used, such as JSON, CSV, etc (also custom writers can be created). In this example, an Avro Writer is supplied, configured as follows:
+
+- *Schema Write Strategy:* Embed Avro Schema
+- *Schema Cache:* No value set
+- *Schema Protocol Version:* 1
+- *Schema Access Strategy:* Inherit Record Schema
+- *Schema Registry:* No value set
+- *Schema Name:* ${schema.name}
+- *Schema Version:* No value set
+- *Schema Branch:* No value set
+- *Schema Text:* ${avro.schema}
+- *Compression Format:* NONE
+- *Cache Size:* 1000
+- *Encoder Pool Size:* 32
+
+
+The output flowfile will contain the PLC read values. This information is included in the flowfile content, following the Record Oriented presentation using an **schema** and the configuration specified in the Record Writer (format, schema inclusion, etc). In the schema, one field will be included for each of the variables defined in the Processor's  *PLC resource address String:* property, taking into account the specified datatype. Also, a *ts* (timestamp) field is additionally included containing the read date. An example of the content of a flowfile for the previously defined properties:

Review Comment:
   a schema



##########
plc4j/integrations/apache-nifi/nifi-plc4x-processors/src/main/java/org/apache/plc4x/nifi/record/Plc4xReadResponseRecordSet.java:
##########
@@ -0,0 +1,128 @@
+package org.apache.plc4x.nifi.record;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.value.PlcValue;
+import org.apache.plc4x.nifi.util.Plc4xCommon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Plc4xReadResponseRecordSet implements RecordSet, Closeable {
+    private static final Logger logger = LoggerFactory.getLogger(Plc4xReadResponseRecordSet.class);
+    private final PlcReadResponse readResponse;
+    private final Set<String> rsColumnNames;
+    private boolean moreRows;
+
+    // TODO: review this AtomicReference?
+	// TODO: this could be enhanced checking if record schema should be updated (via a cache boolean, checking property values is a nifi expression language, etc)
+  	private AtomicReference<RecordSchema> recordSchema;
+  	// inigo private AtomicReference<Map<String, RecordSchema>> map 
+
+    public Plc4xReadResponseRecordSet(final PlcReadResponse readResponse) throws IOException {
+        this.readResponse = readResponse;
+        moreRows = true;
+        
+        logger.debug("Creating record schema from PlcReadResponse");
+        Map<String, ? extends PlcValue> responseDataStructure = readResponse.getAsPlcValue().getStruct();
+        rsColumnNames = responseDataStructure.keySet();
+        

Review Comment:
   english please



##########
plc4j/integrations/apache-nifi/README.md:
##########
@@ -0,0 +1,90 @@
+# PLC4X Apache NiFi Integration
+
+## Plc4xSinkProcessor
+
+## Plc4xSourceProcessor
+
+## Plc4xSourceRecordProcessor
+
+This processor is <ins>record oriented</ins>, formatting output flowfile content using a Record Writer (for further information see [NiFi Documentation](https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#overview)). An example of operation is included on the following path:
+*./test-nifi-template/NIFI-PLC4XIntegration-record-example-1.12.xml*. This file is a Nifi Template that could be directly imported from the NiFi UI to test the operation.
+
+The Plc4xSourceRecord Processor can be configured using the following **properties**:
+
+- *PLC connection String:* PLC4X connection string used to connect to a given PLC device.
+- *PLC resource address String:* PLC4X address string used identify the resource to read/write on a given PLC device (Multiple values supported). The expected  format is: {name}={address}(;{name}={address}*)
+- *Record Writer:* Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.
+- *Force Reconnect every request:* Specifies if the connection to the PLC will be recreated on every trigger event
+
+An *example* of these properties for reading values from a S7-1200:
+
+- *PLC connection String:* *s7://10.105.143.7:102?remote-rack=0&remote-slot=1&controller-type=S7_1200*
+- *PLC resource address String:* *var1=%DB1:DBX0.0:BOOL;var2=%DB1:DBX0.1:BOOL;var3=%DB1:DBB01:BYTE;var4=%DB1:DBW02:WORD;var5=%DB1:DBW04:INT*
+- *Record Writer:* *PLC4x Embedded - AvroRecordSetWriter*
+- *Force Reconnect every request:* *false*
+
+For the **Record Writer** property, any writer included in NiFi could be used, such as JSON, CSV, etc (also custom writers can be created). In this example, an Avro Writer is supplied, configured as follows:
+
+- *Schema Write Strategy:* Embed Avro Schema
+- *Schema Cache:* No value set
+- *Schema Protocol Version:* 1
+- *Schema Access Strategy:* Inherit Record Schema
+- *Schema Registry:* No value set
+- *Schema Name:* ${schema.name}
+- *Schema Version:* No value set
+- *Schema Branch:* No value set
+- *Schema Text:* ${avro.schema}
+- *Compression Format:* NONE
+- *Cache Size:* 1000
+- *Encoder Pool Size:* 32
+
+
+The output flowfile will contain the PLC read values. This information is included in the flowfile content, following the Record Oriented presentation using an **schema** and the configuration specified in the Record Writer (format, schema inclusion, etc). In the schema, one field will be included for each of the variables defined in the Processor's  *PLC resource address String:* property, taking into account the specified datatype. Also, a *ts* (timestamp) field is additionally included containing the read date. An example of the content of a flowfile for the previously defined properties:
+
+```
+[ {

Review Comment:
   I'm confused, the variables above don't have the "_type" on them.  Is that added? or are these names wrong?  They should match



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@plc4x.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org