You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/27 13:58:10 UTC
[plc4x] 22/44: add implementation for configuration
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 76e27ee70ed07d96c2d6eb9f7123da88110ac12d
Author: Stefan Herrmann <st...@codecentric.de>
AuthorDate: Fri Aug 16 17:03:49 2019 +0200
add implementation for configuration
---
plc4j/integrations/logstash-plugin/pom.xml | 7 ++-
.../java/org/apache/plc4x/logstash/Plc4xInput.java | 58 ++++++++++------------
.../apache/plc4x/logstash/configuration/Job.java | 36 ++++++++++++++
.../plc4x/logstash/configuration/Source.java | 27 ++++++++++
.../org/apache/plc4x/logstash/Plc4xInputTest.java | 32 ++++++------
5 files changed, 108 insertions(+), 52 deletions(-)
diff --git a/plc4j/integrations/logstash-plugin/pom.xml b/plc4j/integrations/logstash-plugin/pom.xml
index 329409e..c3b49c9 100644
--- a/plc4j/integrations/logstash-plugin/pom.xml
+++ b/plc4j/integrations/logstash-plugin/pom.xml
@@ -162,7 +162,12 @@
<artifactId>plc4x-tools-logstash-core</artifactId>
<version>${project.version}-${logstash.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-driver-simulated</artifactId>
+ <version>0.5.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
index dc37936..244ef9e 100644
--- a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
+++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
@@ -29,6 +29,8 @@ import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
+import org.apache.plc4x.logstash.configuration.Job;
+import org.apache.plc4x.logstash.configuration.Source;
import java.util.Arrays;
import java.util.Collection;
@@ -41,13 +43,13 @@ import java.util.function.Consumer;
@LogstashPlugin(name="plc4x_input")
public class Plc4xInput implements Input {
- public static final PluginConfigSpec<List<Object>> QUERY_CONFIG =
- PluginConfigSpec.arraySetting("queries");
+ public static final PluginConfigSpec<Map<String, Object>> JOB_CONFIG =
+ PluginConfigSpec.hashSetting("jobs");
- public static final PluginConfigSpec<String> CONNECTION_STRING_CONFIG =
- PluginConfigSpec.requiredStringSetting("connection_string");
- private final String connectionString;
- private final List<Object> queries;
+ public static final PluginConfigSpec<Map<String, Object>> SOURCE_CONFIG =
+ PluginConfigSpec.hashSetting("sources");
+ private final Map<String, Object> sources;
+ private final Map<String, Object> jobs;
private String id;
private PlcDriverManager plcDriverManager;
@@ -60,8 +62,8 @@ public class Plc4xInput implements Input {
public Plc4xInput(String id, Configuration config, Context context) {
// constructors should validate configuration options
this.id = id;
- queries = config.get(QUERY_CONFIG);
- connectionString = config.get(CONNECTION_STRING_CONFIG);
+ jobs = config.get(JOB_CONFIG);
+ sources = config.get(SOURCE_CONFIG);
}
@Override
@@ -78,42 +80,32 @@ public class Plc4xInput implements Input {
// Establish a connection to the plc using the url provided as first argument
ScraperConfigurationTriggeredImplBuilder builder = new ScraperConfigurationTriggeredImplBuilder();
//TODO: use multiple sources:
- String connectionName = "connection1";
- builder.addSource(connectionName, connectionString);
- List<Object> jobConfigs = queries;
-
- for (Object jobConfig : jobConfigs) {
- if (!(jobConfig instanceof String)){
- System.err.println("Query String is not String!");
- continue;
- }
- String config = (String) jobConfig;
- String[] jobConfigSegments = config.split(":");
- if(jobConfigSegments.length < 4) {
- //TODO: use logging from logstash
- System.out.println(String.format("Error in job configuration '%s'. " +
- "The configuration expects at least 4 segments: " +
- "{job-name}:{rate}(:{field-alias}#{field-address})+", jobConfig));
- continue;
- }
+ for (String sourceName : sources.keySet()) {
+ //TODO: check !
+ builder.addSource(sourceName, ((Source) sources.get(sourceName)).getConnectionUri());
+ }
- String jobName = jobConfigSegments[0];
- Integer rate = Integer.valueOf(jobConfigSegments[1]);
+ for (String jobName : jobs.keySet()) {
+ Job job = ((Job) jobs.get(jobName));
JobConfigurationTriggeredImplBuilder jobBuilder = builder.job(
- jobName, String.format("(SCHEDULED,%s)", rate)).source(connectionName);
- for(int i = 3; i < jobConfigSegments.length; i++) {
- String[] fieldSegments = jobConfigSegments[i].split("=");
+ jobName, String.format("(SCHEDULED,%s)", job.getRate()));
+ for (String source : job.getSources()) {
+ jobBuilder.source(source);
+ }
+ for (String query : job.getQueries()) {
+ String[] fieldSegments = query.split("=");
if(fieldSegments.length != 2) {
System.err.println(String.format("Error in job configuration '%s'. " +
"The field segment expects a format {field-alias}#{field-address}, but got '%s'",
- jobName, jobConfigSegments[i]));
+ jobName, query));
continue;
}
String fieldAlias = fieldSegments[0];
String fieldAddress = fieldSegments[1];
jobBuilder.field(fieldAlias, fieldAddress);
}
+ jobBuilder.build();
}
ScraperConfigurationTriggeredImpl scraperConfig = builder.build();
@@ -148,7 +140,7 @@ public class Plc4xInput implements Input {
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
// should return a list of all configuration options for this plugin
- return Arrays.asList(QUERY_CONFIG, CONNECTION_STRING_CONFIG);
+ return Arrays.asList(JOB_CONFIG, SOURCE_CONFIG);
}
@Override
diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Job.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Job.java
new file mode 100644
index 0000000..ee86bdb
--- /dev/null
+++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Job.java
@@ -0,0 +1,36 @@
+package org.apache.plc4x.logstash.configuration;
+
+import java.util.List;
+
+public class Job {
+ private final Integer rate;
+ private final List<String> queries;
+ private final List<String> sources;
+
+ public Job(Integer rate, List<String> queries, List<String> sources) {
+ this.rate = rate;
+ this.queries = queries;
+ this.sources = sources;
+ }
+
+ @Override
+ public String toString() {
+ return "Job{" +
+ "rate=" + rate +
+ ", queries=" + queries +
+ ", sources=" + sources +
+ '}';
+ }
+
+ public Integer getRate() {
+ return rate;
+ }
+
+ public List<String> getQueries() {
+ return queries;
+ }
+
+ public List<String> getSources() {
+ return sources;
+ }
+}
diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Source.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Source.java
new file mode 100644
index 0000000..b393b37
--- /dev/null
+++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Source.java
@@ -0,0 +1,27 @@
+package org.apache.plc4x.logstash.configuration;
+
+public class Source {
+ private final String connectionUri;
+ private final String sourceAlias;
+
+ @Override
+ public String toString() {
+ return "Source{" +
+ "connectionUri='" + connectionUri + '\'' +
+ ", sourceAlias='" + sourceAlias + '\'' +
+ '}';
+ }
+
+ public String getConnectionUri() {
+ return connectionUri;
+ }
+
+ public String getSourceAlias() {
+ return sourceAlias;
+ }
+
+ public Source(String connectionUri, String sourceAlias) {
+ this.connectionUri = connectionUri;
+ this.sourceAlias = sourceAlias;
+ }
+}
diff --git a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
index 16db66c..4824e3c 100644
--- a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
+++ b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
@@ -19,39 +19,35 @@ under the License.
package org.apache.plc4x.logstash;
import co.elastic.logstash.api.Configuration;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.apache.plc4x.logstash.configuration.Job;
+import org.apache.plc4x.logstash.configuration.Source;
+import org.assertj.core.util.Maps;
+import org.junit.jupiter.api.Test;
import org.logstash.plugins.ConfigurationImpl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.function.Consumer;
public class Plc4xInputTest {
@Test
- @Ignore
public void testPlc4xInput() {
- String prefix = "This is message";
- long eventCount = 5;
Map<String, Object> configValues = new HashMap<>();
-// configValues.put(Plc4xInput.PREFIX_CONFIG.name(), prefix);
-// configValues.put(Plc4xInput.EVENT_COUNT_CONFIG.name(), eventCount);
+
+ Job job1 = new Job(300, Arrays.asList("testfield=RANDOM/foo:INTEGER"), Arrays.asList("TestConnection"));
+ Source testConnection = new Source("test:hurzpurzfurz", "TestConnection");
+
+ configValues.put(Plc4xInput.SOURCE_CONFIG.name(), Maps.newHashMap(testConnection.getSourceAlias(), testConnection));
+ configValues.put(Plc4xInput.JOB_CONFIG.name(), Maps.newHashMap("job1", job1));
+
+
Configuration config = new ConfigurationImpl(configValues);
Plc4xInput input = new Plc4xInput("test-id", config, null);
TestConsumer testConsumer = new TestConsumer();
input.start(testConsumer);
List<Map<String, Object>> events = testConsumer.getEvents();
- Assert.assertEquals(eventCount, events.size());
- for (int k = 1; k <= events.size(); k++) {
- Assert.assertEquals(prefix + " " + StringUtils.center(k + " of " + eventCount, 20),
- events.get(k - 1).get("message"));
- }
+ System.out.println(events.size());
}
private static class TestConsumer implements Consumer<Map<String, Object>> {