You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/24 17:37:04 UTC

[plc4x] branch featule/kafka-connect-refactoring updated (639c5e8 -> 74c892c)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a change to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git.


    from 639c5e8  Merge branches 'develop' and 'featule/kafka-connect-refactoring' of https://gitbox.apache.org/repos/asf/plc4x into featule/kafka-connect-refactoring
     add cc9c016  - Added Passive mode templates and an sandbox passive mode s7 driver
     add 2d0a582  Example MSpec for the DF1 protocol added to the website
     add 7fca0a5  Merge pull request #79 from vemmert/site-mspec-df1-example
     add 6220138  - Implement a first partially working passive S7 driver. - Fixed some issues in the S7 mspec - Fixed some issues with optional fields in the code generation - Extended the mspec antlr4 grammar to support "/" and "*" operations
     add 79cdbce  PLC4X-86 - Fix and re-enable tests that were disabled for Java 11 support
     add afaa24c  Merge pull request #78 from thomasdarimont/issue/PLC4X-86-Fix-tests-failing-on-java11
     add 28d8f6c  - Fixed a dependency usage problem.
     add 7537323  - Removed some invalid configuration options raw sockets don't support.
     add cdd53c9  - Increasing the timeout to 24 hours as Jenkins seems to be rather slow at the moment ...
     add a660a46  - Changed the deploy phase to run on a node labeled: nexus-deploy
     new b649586  Merge branches 'develop' and 'featule/kafka-connect-refactoring' of https://gitbox.apache.org/repos/asf/plc4x into featule/kafka-connect-refactoring
     new 74c892c  - Finished a first fully operational version of the Kafka Connect Source

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 Jenkinsfile                                        |   4 +-
 ...geOutput.java => JavaActiveLanguageOutput.java} |   4 +-
 ...eOutput.java => JavaPassiveLanguageOutput.java} |  17 +-
 ...x.plugins.codegenerator.language.LanguageOutput |   3 +-
 .../{io-template.ftlh => active-io-template.ftlh}  |   2 +-
 .../{io-template.ftlh => passive-io-template.ftlh} | 113 +----------
 .../resources/templates/java/pojo-template.ftlh    |   4 +
 .../plugins/codegenerator/language/mspec/MSpec.g4  |   2 +
 .../connection/AdsAbstractPlcConnectionTest.java   |   9 +-
 .../ads/connection/AdsTcpPlcConnectionTests.java   |  26 ++-
 plc4j/integrations/apache-camel/pom.xml            |   8 +-
 .../org/apache/plc4x/camel/Plc4XProducerTest.java  |   9 +-
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 175 +++++++++-------
 .../base/connection/RawSocketChannelFactory.java   |   5 +-
 .../java/base/connection/SerialChannelFactory.java |   2 -
 .../org/apache/plc4x/java/utils/MessageIO.java     |   7 +-
 .../org/apache/plc4x/java/utils/MessageInput.java} |   6 +-
 .../apache/plc4x/java/utils/MessageOutput.java}    |   7 +-
 .../java/utils/rawsockets/netty/PacketHandler.java |   8 +-
 .../utils/rawsockets/netty/RawSocketChannel.java   |  11 +-
 .../rawsockets/netty/RawSocketChannelConfig.java   |  32 ++-
 .../rawsockets/netty/RawSocketChannelOption.java   |   3 +-
 ...tChannelConfig.java => TcpIpPacketHandler.java} |  21 +-
 .../rawsockets/netty/RawSocketChannelTest.java     |   2 -
 .../s7/src/main/resources/protocols/s7/s7.mspec    |  14 +-
 sandbox/pom.xml                                    |   1 +
 .../pom.xml                                        |  16 +-
 .../plc4x/javapassive/s7/PassiveS7PlcDriver.java   |  40 ++--
 .../s7/connection/PassiveS7PlcConnection.java      |  98 +++++++++
 .../s7/protocol/HelloWorldProtocol.java            |  48 +++++
 .../javapassive/s7/protocol/PassiveS7Protocol.java |  62 ++++++
 .../services/org.apache.plc4x.java.spi.PlcDriver   |   2 +-
 .../test/java/BenchmarkGeneratedPassiveS7.java}    |  32 +--
 .../main/resources/protocols/df1/protocol.mspec    | 222 ---------------------
 .../asciidoc/developers/code-gen/protocol/df1.adoc |  94 +++++++++
 src/site/site.xml                                  |   1 +
 36 files changed, 584 insertions(+), 526 deletions(-)
 copy build-utils/language-java/src/main/java/org/apache/plc4x/language/java/{JavaLanguageOutput.java => JavaActiveLanguageOutput.java} (94%)
 rename build-utils/language-java/src/main/java/org/apache/plc4x/language/java/{JavaLanguageOutput.java => JavaPassiveLanguageOutput.java} (77%)
 copy build-utils/language-java/src/main/resources/templates/java/{io-template.ftlh => active-io-template.ftlh} (99%)
 rename build-utils/language-java/src/main/resources/templates/java/{io-template.ftlh => passive-io-template.ftlh} (75%)
 copy plc4j/{protocols/driver-bases/base/src/main/java/org/apache/plc4x/java/base/messages/PlcProtocolMessage.java => utils/driver-base-java/src/main/java/org/apache/plc4x/java/utils/MessageInput.java} (82%)
 copy plc4j/{api/src/main/java/org/apache/plc4x/java/api/messages/PlcWriteResponse.java => utils/driver-base-java/src/main/java/org/apache/plc4x/java/utils/MessageOutput.java} (80%)
 copy sandbox/code-gen/src/main/java/org/apache/plc4x/codegen/ast/Node.java => plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/PacketHandler.java (83%)
 copy plc4j/utils/raw-sockets/src/main/java/org/apache/plc4x/java/utils/rawsockets/netty/{RawSocketChannelConfig.java => TcpIpPacketHandler.java} (59%)
 copy sandbox/{test-java-s7-driver => test-java-passive-s7-driver}/pom.xml (84%)
 copy plc4j/drivers/s7/src/main/java/org/apache/plc4x/java/s7/S7PlcDriver.java => sandbox/test-java-passive-s7-driver/src/main/java/org/apache/plc4x/javapassive/s7/PassiveS7PlcDriver.java (62%)
 create mode 100644 sandbox/test-java-passive-s7-driver/src/main/java/org/apache/plc4x/javapassive/s7/connection/PassiveS7PlcConnection.java
 create mode 100644 sandbox/test-java-passive-s7-driver/src/main/java/org/apache/plc4x/javapassive/s7/protocol/HelloWorldProtocol.java
 create mode 100644 sandbox/test-java-passive-s7-driver/src/main/java/org/apache/plc4x/javapassive/s7/protocol/PassiveS7Protocol.java
 copy {plc4j/api/src/test/resources/test => sandbox/test-java-passive-s7-driver/src/main/resources}/META-INF/services/org.apache.plc4x.java.spi.PlcDriver (93%)
 copy sandbox/{test-java-s7-driver/src/test/java/BenchmarkGeneratedS7.java => test-java-passive-s7-driver/src/test/java/BenchmarkGeneratedPassiveS7.java} (60%)
 delete mode 100644 sandbox/test-java-s7-driver/src/main/resources/protocols/df1/protocol.mspec
 create mode 100644 src/site/asciidoc/developers/code-gen/protocol/df1.adoc


[plc4x] 02/02: - Finished a first fully operational version of the Kafka Connect Source

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 74c892cda34d2ac9d1f1b7eb94b2d55bfba60c6e
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sat Aug 24 19:36:58 2019 +0200

    - Finished a first fully operational version of the Kafka Connect Source
---
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    | 175 ++++++++++++---------
 1 file changed, 101 insertions(+), 74 deletions(-)

diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 87ede4e..81b0d62 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.scraper.ResultHandler;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
@@ -40,6 +39,10 @@ import org.apache.plc4x.kafka.util.VersionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -76,21 +79,13 @@ public class Plc4xSourceTask extends SourceTask {
      */
     private static final String SOURCE_NAME_FIELD = "source-name";
     private static final String JOB_NAME_FIELD = "job-name";
-    private static final String FIELD_NAME_FIELD = "field-name";
 
     private static final Schema KEY_SCHEMA =
         new SchemaBuilder(Schema.Type.STRUCT)
             .field(SOURCE_NAME_FIELD, Schema.STRING_SCHEMA)
             .field(JOB_NAME_FIELD, Schema.STRING_SCHEMA)
-            .field(FIELD_NAME_FIELD, Schema.STRING_SCHEMA)
             .build();
 
-    // Internal properties.
-    private Map<String, String> topics;
-    private PlcDriverManager plcDriverManager;
-    private TriggerCollector triggerCollector;
-    private TriggeredScraperImpl scraper;
-
     // Internal buffer into which all incoming scraper responses are written to.
     private ArrayBlockingQueue<SourceRecord> buffer;
 
@@ -104,7 +99,7 @@ public class Plc4xSourceTask extends SourceTask {
         AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
         String connectionName = config.getString(CONNECTION_NAME_CONFIG);
         String plc4xConnectionString = config.getString(PLC4X_CONNECTION_STRING_CONFIG);
-        topics = new HashMap<>();
+        Map<String, String> topics = new HashMap<>();
         // Create a buffer with a capacity of 1000 elements which schedules access in a fair way.
         buffer = new ArrayBlockingQueue<>(1000, true);
 
@@ -145,51 +140,58 @@ public class Plc4xSourceTask extends SourceTask {
         ScraperConfigurationTriggeredImpl scraperConfig = builder.build();
 
         try {
-            plcDriverManager = new PooledPlcDriverManager();
-            triggerCollector = new TriggerCollectorImpl(plcDriverManager);
-            scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() {
-                @Override
-                public void handle(String jobName, String sourceName, Map<String, Object> results) {
-                    Long timestamp = System.currentTimeMillis();
-
-                    Map<String, String> sourcePartition = new HashMap<>();
-                    sourcePartition.put("sourceName", sourceName);
-                    sourcePartition.put("jobName", jobName);
-
-                    Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
-
-                    String topic = topics.get(jobName);
-
-                    for (Map.Entry<String, Object> result : results.entrySet()) {
-                        // Get field-name and -value from the results.
-                        String fieldName = result.getKey();
-                        Object fieldValue = result.getValue();
-
-                        // Prepare the key structure.
-                        Struct key = new Struct(KEY_SCHEMA)
-                            .put(SOURCE_NAME_FIELD, sourceName)
-                            .put(JOB_NAME_FIELD, jobName)
-                            .put(FIELD_NAME_FIELD, fieldName);
-
-                        // Get the schema for the given value type.
-                        Schema valueSchema = getSchema(fieldValue);
-
-                        // Prepare the source-record element.
-                        SourceRecord record =
-                            new SourceRecord(
-                                sourcePartition,
-                                sourceOffset,
-                                topic,
-                                KEY_SCHEMA,
-                                key,
-                                valueSchema,
-                                fieldValue
-                            );
-
-                        // Add the new source-record to the buffer.
-                        buffer.add(record);
-                    }
+            PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
+            TriggerCollector triggerCollector = new TriggerCollectorImpl(plcDriverManager);
+            TriggeredScraperImpl scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> {
+                Long timestamp = System.currentTimeMillis();
+
+                Map<String, String> sourcePartition = new HashMap<>();
+                sourcePartition.put("sourceName", sourceName);
+                sourcePartition.put("jobName", jobName);
+
+                Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp);
+
+                String topic = topics.get(jobName);
+
+                // Prepare the key structure.
+                Struct key = new Struct(KEY_SCHEMA)
+                    .put(SOURCE_NAME_FIELD, sourceName)
+                    .put(JOB_NAME_FIELD, jobName);
+
+                // Build the Schema for the result struct.
+                SchemaBuilder recordSchemaBuilder = SchemaBuilder.struct().name("org.apache.plc4x.kafka.JobResult");
+                for (Map.Entry<String, Object> result : results.entrySet()) {
+                    // Get field-name and -value from the results.
+                    String fieldName = result.getKey();
+                    Object fieldValue = result.getValue();
+
+                    // Get the schema for the given value type.
+                    Schema valueSchema = getSchema(fieldValue);
+
+                    // Add the schema description for the current field.
+                    recordSchemaBuilder.field(fieldName, valueSchema);
+                }
+                Schema recordSchema = recordSchemaBuilder.build();
+
+                // Build the struct itself.
+                Struct recordStruct = new Struct(recordSchema);
+                for (Map.Entry<String, Object> result : results.entrySet()) {
+                    // Get field-name and -value from the results.
+                    String fieldName = result.getKey();
+                    Object fieldValue = result.getValue();
+                    recordStruct.put(fieldName, fieldValue);
                 }
+
+                // Prepare the source-record element.
+                SourceRecord record = new SourceRecord(
+                    sourcePartition, sourceOffset,
+                    topic,
+                    KEY_SCHEMA, key,
+                    recordSchema, recordStruct
+                    );
+
+                // Add the new source-record to the buffer.
+                buffer.add(record);
             }, triggerCollector);
             scraper.start();
             triggerCollector.start();
@@ -221,33 +223,58 @@ public class Plc4xSourceTask extends SourceTask {
     private Schema getSchema(Object value) {
         Objects.requireNonNull(value);
 
-        if (value instanceof Byte)
-            return Schema.INT8_SCHEMA;
-
-        if (value instanceof Short)
-            return Schema.INT16_SCHEMA;
-
-        if (value instanceof Integer)
-            return Schema.INT32_SCHEMA;
+        if(value instanceof List) {
+            List list = (List) value;
+            if(list.isEmpty()) {
+                throw new ConnectException("Unsupported empty lists.");
+            }
+            // In PLC4X list elements all contain the same type.
+            Object firstElement = list.get(0);
+            Schema elementSchema = getSchema(firstElement);
+            return SchemaBuilder.array(elementSchema).build();
+        }
+        if (value instanceof BigDecimal) {
 
-        if (value instanceof Long)
-            return Schema.INT64_SCHEMA;
+        }
+        if (value instanceof BigDecimal) {
 
-        if (value instanceof Float)
+        }
+        if (value instanceof Boolean) {
+            return Schema.BOOLEAN_SCHEMA;
+        }
+        if (value instanceof byte[]) {
+            return Schema.BYTES_SCHEMA;
+        }
+        if (value instanceof Byte) {
+            return Schema.INT8_SCHEMA;
+        }
+        if (value instanceof Double) {
+            return Schema.FLOAT64_SCHEMA;
+        }
+        if (value instanceof Float) {
             return Schema.FLOAT32_SCHEMA;
+        }
+        if (value instanceof Integer) {
+            return Schema.INT32_SCHEMA;
+        }
+        if (value instanceof LocalDate) {
 
-        if (value instanceof Double)
-            return Schema.FLOAT64_SCHEMA;
+        }
+        if (value instanceof LocalDateTime) {
 
-        if (value instanceof Boolean)
-            return Schema.BOOLEAN_SCHEMA;
+        }
+        if (value instanceof LocalTime) {
 
-        if (value instanceof String)
+        }
+        if (value instanceof Long) {
+            return Schema.INT64_SCHEMA;
+        }
+        if (value instanceof Short) {
+            return Schema.INT16_SCHEMA;
+        }
+        if (value instanceof String) {
             return Schema.STRING_SCHEMA;
-
-        if (value instanceof byte[])
-            return Schema.BYTES_SCHEMA;
-
+        }
         // TODO: add support for collective and complex types
         throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName()));
     }


[plc4x] 01/02: Merge branches 'develop' and 'featule/kafka-connect-refactoring' of https://gitbox.apache.org/repos/asf/plc4x into featule/kafka-connect-refactoring

Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch featule/kafka-connect-refactoring
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit b6495869dcc8b649df18811c439516ce154ba5b9
Merge: 639c5e8 a660a46
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Sat Aug 24 17:33:25 2019 +0200

    Merge branches 'develop' and 'featule/kafka-connect-refactoring' of https://gitbox.apache.org/repos/asf/plc4x into featule/kafka-connect-refactoring

 Jenkinsfile                                        |   4 +-
 ...geOutput.java => JavaActiveLanguageOutput.java} |   4 +-
 ...eOutput.java => JavaPassiveLanguageOutput.java} |  17 +-
 ...x.plugins.codegenerator.language.LanguageOutput |   3 +-
 .../{io-template.ftlh => active-io-template.ftlh}  |   2 +-
 .../{io-template.ftlh => passive-io-template.ftlh} | 113 +----------
 .../resources/templates/java/pojo-template.ftlh    |   4 +
 .../plugins/codegenerator/language/mspec/MSpec.g4  |   2 +
 .../connection/AdsAbstractPlcConnectionTest.java   |   9 +-
 .../ads/connection/AdsTcpPlcConnectionTests.java   |  26 ++-
 plc4j/integrations/apache-camel/pom.xml            |   8 +-
 .../org/apache/plc4x/camel/Plc4XProducerTest.java  |   9 +-
 .../base/connection/RawSocketChannelFactory.java   |   5 +-
 .../java/base/connection/SerialChannelFactory.java |   2 -
 .../org/apache/plc4x/java/utils/MessageIO.java     |   7 +-
 .../org/apache/plc4x/java/utils/MessageInput.java} |  12 +-
 .../apache/plc4x/java/utils/MessageOutput.java}    |  12 +-
 ...SocketChannelOption.java => PacketHandler.java} |  10 +-
 .../utils/rawsockets/netty/RawSocketChannel.java   |  11 +-
 .../rawsockets/netty/RawSocketChannelConfig.java   |  32 ++-
 .../rawsockets/netty/RawSocketChannelOption.java   |   3 +-
 ...tChannelConfig.java => TcpIpPacketHandler.java} |  21 +-
 .../rawsockets/netty/RawSocketChannelTest.java     |   2 -
 .../s7/src/main/resources/protocols/s7/s7.mspec    |  14 +-
 sandbox/pom.xml                                    |   1 +
 sandbox/test-java-passive-s7-driver/pom.xml        | 101 ++++++++++
 .../plc4x/javapassive/s7/PassiveS7PlcDriver.java   |  89 +++++++++
 .../s7/connection/PassiveS7PlcConnection.java      |  98 +++++++++
 .../s7/protocol/HelloWorldProtocol.java            |  48 +++++
 .../javapassive/s7/protocol/PassiveS7Protocol.java |  62 ++++++
 .../services/org.apache.plc4x.java.spi.PlcDriver   |   4 +-
 .../src/test/java/BenchmarkGeneratedPassiveS7.java |  45 +++++
 .../main/resources/protocols/df1/protocol.mspec    | 222 ---------------------
 .../asciidoc/developers/code-gen/protocol/df1.adoc |  94 +++++++++
 src/site/site.xml                                  |   1 +
 35 files changed, 678 insertions(+), 419 deletions(-)