You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/27 13:58:28 UTC
[plc4x] 40/44: logstash adapter + test
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 0dd5acbff1a9a2699dc00b788215fa082715fefa
Author: Till Voss <ti...@codecentric.de>
AuthorDate: Fri Aug 23 08:15:35 2019 +0200
logstash adapter + test
---
.../main/java/org/apache/plc4x/logstash/Plc4x.java | 57 +++++++++++++---------
.../org/apache/plc4x/logstash/Plc4xInputTest.java | 6 +--
2 files changed, 37 insertions(+), 26 deletions(-)
diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java
index 91e2c13..601f8e6 100644
--- a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java
+++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java
@@ -20,7 +20,6 @@ package org.apache.plc4x.logstash;
import co.elastic.logstash.api.*;
import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.scraper.ResultHandler;
import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
@@ -30,16 +29,15 @@ import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.T
import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
+import java.util.logging.Logger;
// class name must match plugin name
@LogstashPlugin(name="plc4x")
public class Plc4x implements Input {
+ static Logger logger = Logger.getLogger(Plc4x.class.getName());
public static final PluginConfigSpec<Map<String, Object>> JOB_CONFIG =
PluginConfigSpec.hashSetting("jobs");
@@ -80,8 +78,13 @@ public class Plc4x implements Input {
//TODO: use multiple sources:
for (String sourceName : sources.keySet()) {
- //TODO: check !
- builder.addSource(sourceName, ((String) sources.get(sourceName)));
+ Object o = sources.get(sourceName);
+ if(o instanceof String) {
+ String source = (String)o;
+ builder.addSource(sourceName, source);
+ } else {
+ logger.severe("URL of source " + sourceName + "has the wrong typ!");
+ }
}
for (String jobName : jobs.keySet()) {
@@ -93,21 +96,16 @@ public class Plc4x implements Input {
for (String source : ((List<String>) job.get("sources"))) {
jobBuilder.source(source);
}
- for (String query : ((List<String>) job.get("queries"))) {
- String[] fieldSegments = query.split("=");
- if (fieldSegments.length != 2) {
- System.err.println(String.format("Error in job configuration '%s'. " +
- "The field segment expects a format {field-alias}={field-address}, but got '%s'",
- jobName, query));
- continue;
- }
- String fieldAlias = fieldSegments[0];
- String fieldAddress = fieldSegments[1];
+ Map<String, Object> queries = (Map<String, Object>) job.get("queries");
+ for (String queryName : queries.keySet()) {
+
+ String fieldAlias = queryName;
+ String fieldAddress = (String) queries.get(queryName);
jobBuilder.field(fieldAlias, fieldAddress);
}
jobBuilder.build();
} else {
- System.err.println("Jobs of wrong Type!");
+ logger.severe("Jobs of wrong Type!");
}
}
@@ -115,17 +113,30 @@ public class Plc4x implements Input {
try {
plcDriverManager = new PooledPlcDriverManager();
triggerCollector = new TriggerCollectorImpl(plcDriverManager);
- scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() {
- @Override
- public void handle(String jobName, String sourceName, Map<String, Object> results) {
- //TODO: use jobname etc for multiple connections
+ scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> {
+
+ //TODO: use jobname etc for multiple connections
+ for (Map.Entry<String, Object> result : results.entrySet()) {
+ // Get field-name and -value from the results.
+ String fieldName = result.getKey();
+ Object fieldValue = result.getValue();
+
+ System.out.println("fieldName: " + fieldName);
+ System.out.println("fieldValue: " + fieldValue);
consumer.accept(results);
}
}, triggerCollector);
scraper.start();
triggerCollector.start();
} catch (ScraperException e) {
- System.err.println("Error starting the scraper: "+ e);
+ logger.severe("Error starting the scraper: "+ e);
+ }
+ while(true) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
}
diff --git a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
index 024a43b..9cf3b44 100644
--- a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
+++ b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
@@ -33,14 +33,15 @@ public class Plc4xInputTest {
Map<String, Object> configValues = new HashMap<>();
Map<String, Object> jobValues = new HashMap<>();
- List<String> queries = Arrays.asList("testfield=RANDOM/foo:INTEGER");
+ Map<String,Object> queries = new HashMap<>();
+ queries.put("testfield", "ns=2;i=4");
List<String> sources = Arrays.asList("TestConnection");
jobValues.put("rate", 300);
jobValues.put("queries", queries);
jobValues.put("sources", sources);
- configValues.put(Plc4x.SOURCE_CONFIG.name(), Maps.newHashMap("TestConnection", "test:hurzpurzfurz"));
+ configValues.put(Plc4x.SOURCE_CONFIG.name(), Maps.newHashMap("TestConnection", "opcua:tcp://localhost:4840/freeopcua/server/"));
configValues.put(Plc4x.JOB_CONFIG.name(), Maps.newHashMap("job1", jobValues));
@@ -68,5 +69,4 @@ public class Plc4xInputTest {
return events;
}
}
-
}