You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/27 13:58:10 UTC

[plc4x] 22/44: add implementation for configuration

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 76e27ee70ed07d96c2d6eb9f7123da88110ac12d
Author: Stefan Herrmann <st...@codecentric.de>
AuthorDate: Fri Aug 16 17:03:49 2019 +0200

    add implementation for configuration
---
 plc4j/integrations/logstash-plugin/pom.xml         |  7 ++-
 .../java/org/apache/plc4x/logstash/Plc4xInput.java | 58 ++++++++++------------
 .../apache/plc4x/logstash/configuration/Job.java   | 36 ++++++++++++++
 .../plc4x/logstash/configuration/Source.java       | 27 ++++++++++
 .../org/apache/plc4x/logstash/Plc4xInputTest.java  | 32 ++++++------
 5 files changed, 108 insertions(+), 52 deletions(-)

diff --git a/plc4j/integrations/logstash-plugin/pom.xml b/plc4j/integrations/logstash-plugin/pom.xml
index 329409e..c3b49c9 100644
--- a/plc4j/integrations/logstash-plugin/pom.xml
+++ b/plc4j/integrations/logstash-plugin/pom.xml
@@ -162,7 +162,12 @@
       <artifactId>plc4x-tools-logstash-core</artifactId>
       <version>${project.version}-${logstash.version}</version>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4j-driver-simulated</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
index dc37936..244ef9e 100644
--- a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
+++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
@@ -29,6 +29,8 @@ import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
 import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
 import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.apache.plc4x.logstash.configuration.Job;
+import org.apache.plc4x.logstash.configuration.Source;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -41,13 +43,13 @@ import java.util.function.Consumer;
 @LogstashPlugin(name="plc4x_input")
 public class Plc4xInput implements Input {
 
-    public static final PluginConfigSpec<List<Object>> QUERY_CONFIG =
-            PluginConfigSpec.arraySetting("queries");
+    public static final PluginConfigSpec<Map<String, Object>> JOB_CONFIG =
+            PluginConfigSpec.hashSetting("jobs");
 
-    public static final PluginConfigSpec<String> CONNECTION_STRING_CONFIG =
-            PluginConfigSpec.requiredStringSetting("connection_string");
-    private final String connectionString;
-    private final List<Object> queries;
+    public static final PluginConfigSpec<Map<String, Object>> SOURCE_CONFIG =
+            PluginConfigSpec.hashSetting("sources");
+    private final Map<String, Object> sources;
+    private final Map<String, Object> jobs;
 
     private String id;
     private PlcDriverManager plcDriverManager;
@@ -60,8 +62,8 @@ public class Plc4xInput implements Input {
     public Plc4xInput(String id, Configuration config, Context context) {
         // constructors should validate configuration options
         this.id = id;
-        queries = config.get(QUERY_CONFIG);
-        connectionString = config.get(CONNECTION_STRING_CONFIG);
+        jobs = config.get(JOB_CONFIG);
+        sources = config.get(SOURCE_CONFIG);
     }
 
     @Override
@@ -78,42 +80,32 @@ public class Plc4xInput implements Input {
         // Establish a connection to the plc using the url provided as first argument
         ScraperConfigurationTriggeredImplBuilder builder = new ScraperConfigurationTriggeredImplBuilder();
         //TODO: use multiple sources:
-        String connectionName = "connection1";
-        builder.addSource(connectionName, connectionString);
 
-        List<Object> jobConfigs = queries;
-
-        for (Object jobConfig : jobConfigs) {
-            if (!(jobConfig instanceof String)){
-                System.err.println("Query String is not String!");
-                continue;
-            }
-            String config = (String) jobConfig;
-            String[] jobConfigSegments = config.split(":");
-            if(jobConfigSegments.length < 4) {
-                //TODO: use logging from logstash
-                System.out.println(String.format("Error in job configuration '%s'. " +
-                    "The configuration expects at least 4 segments: " +
-                    "{job-name}:{rate}(:{field-alias}#{field-address})+", jobConfig));
-                continue;
-            }
+        for (String sourceName : sources.keySet()) {
+            //TODO: check !
+            builder.addSource(sourceName, ((Source) sources.get(sourceName)).getConnectionUri());
+        }
 
-            String jobName = jobConfigSegments[0];
-            Integer rate = Integer.valueOf(jobConfigSegments[1]);
+        for (String jobName : jobs.keySet()) {
+            Job job = ((Job) jobs.get(jobName));
             JobConfigurationTriggeredImplBuilder jobBuilder = builder.job(
-                jobName, String.format("(SCHEDULED,%s)", rate)).source(connectionName);
-            for(int i = 3; i < jobConfigSegments.length; i++) {
-                String[] fieldSegments = jobConfigSegments[i].split("=");
+                jobName, String.format("(SCHEDULED,%s)", job.getRate()));
+            for (String source : job.getSources()) {
+                jobBuilder.source(source);
+            }
+            for (String query : job.getQueries()) {
+                String[] fieldSegments = query.split("=");
                 if(fieldSegments.length != 2) {
                     System.err.println(String.format("Error in job configuration '%s'. " +
                             "The field segment expects a format {field-alias}#{field-address}, but got '%s'",
-                        jobName, jobConfigSegments[i]));
+                        jobName, query));
                     continue;
                 }
                 String fieldAlias = fieldSegments[0];
                 String fieldAddress = fieldSegments[1];
                 jobBuilder.field(fieldAlias, fieldAddress);
             }
+            jobBuilder.build();
         }
 
         ScraperConfigurationTriggeredImpl scraperConfig = builder.build();
@@ -148,7 +140,7 @@ public class Plc4xInput implements Input {
     @Override
     public Collection<PluginConfigSpec<?>> configSchema() {
         // should return a list of all configuration options for this plugin
-        return Arrays.asList(QUERY_CONFIG, CONNECTION_STRING_CONFIG);
+        return Arrays.asList(JOB_CONFIG, SOURCE_CONFIG);
     }
 
     @Override
diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Job.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Job.java
new file mode 100644
index 0000000..ee86bdb
--- /dev/null
+++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Job.java
@@ -0,0 +1,36 @@
+package org.apache.plc4x.logstash.configuration;
+
+import java.util.List;
+
+public class Job {
+    private final Integer rate;
+    private final List<String> queries;
+    private final List<String> sources;
+
+    public Job(Integer rate, List<String> queries, List<String> sources) {
+        this.rate = rate;
+        this.queries = queries;
+        this.sources = sources;
+    }
+
+    @Override
+    public String toString() {
+        return "Job{" +
+            "rate=" + rate +
+            ", queries=" + queries +
+            ", sources=" + sources +
+            '}';
+    }
+
+    public Integer getRate() {
+        return rate;
+    }
+
+    public List<String> getQueries() {
+        return queries;
+    }
+
+    public List<String> getSources() {
+        return sources;
+    }
+}
diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Source.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Source.java
new file mode 100644
index 0000000..b393b37
--- /dev/null
+++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Source.java
@@ -0,0 +1,27 @@
+package org.apache.plc4x.logstash.configuration;
+
+public class Source {
+    private final String connectionUri;
+    private final String sourceAlias;
+
+    @Override
+    public String toString() {
+        return "Source{" +
+            "connectionUri='" + connectionUri + '\'' +
+            ", sourceAlias='" + sourceAlias + '\'' +
+            '}';
+    }
+
+    public String getConnectionUri() {
+        return connectionUri;
+    }
+
+    public String getSourceAlias() {
+        return sourceAlias;
+    }
+
+    public Source(String connectionUri, String sourceAlias) {
+        this.connectionUri = connectionUri;
+        this.sourceAlias = sourceAlias;
+    }
+}
diff --git a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
index 16db66c..4824e3c 100644
--- a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
+++ b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
@@ -19,39 +19,35 @@ under the License.
 package org.apache.plc4x.logstash;
 
 import co.elastic.logstash.api.Configuration;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.apache.plc4x.logstash.configuration.Job;
+import org.apache.plc4x.logstash.configuration.Source;
+import org.assertj.core.util.Maps;
+import org.junit.jupiter.api.Test;
 import org.logstash.plugins.ConfigurationImpl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.function.Consumer;
 
 public class Plc4xInputTest {
 
     @Test
-    @Ignore
     public void testPlc4xInput() {
-        String prefix = "This is message";
-        long eventCount = 5;
         Map<String, Object> configValues = new HashMap<>();
-//        configValues.put(Plc4xInput.PREFIX_CONFIG.name(), prefix);
-//        configValues.put(Plc4xInput.EVENT_COUNT_CONFIG.name(), eventCount);
+
+        Job job1 = new Job(300, Arrays.asList("testfield=RANDOM/foo:INTEGER"), Arrays.asList("TestConnection"));
+        Source testConnection = new Source("test:hurzpurzfurz", "TestConnection");
+
+        configValues.put(Plc4xInput.SOURCE_CONFIG.name(), Maps.newHashMap(testConnection.getSourceAlias(), testConnection));
+        configValues.put(Plc4xInput.JOB_CONFIG.name(),  Maps.newHashMap("job1", job1));
+
+
         Configuration config = new ConfigurationImpl(configValues);
         Plc4xInput input = new Plc4xInput("test-id", config, null);
         TestConsumer testConsumer = new TestConsumer();
         input.start(testConsumer);
 
         List<Map<String, Object>> events = testConsumer.getEvents();
-        Assert.assertEquals(eventCount, events.size());
-        for (int k = 1; k <= events.size(); k++) {
-            Assert.assertEquals(prefix + " " + StringUtils.center(k + " of " + eventCount, 20),
-                    events.get(k - 1).get("message"));
-        }
+        System.out.println(events.size());
     }
 
     private static class TestConsumer implements Consumer<Map<String, Object>> {