You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2019/08/27 13:58:28 UTC

[plc4x] 40/44: logstash adapter + test

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 0dd5acbff1a9a2699dc00b788215fa082715fefa
Author: Till Voss <ti...@codecentric.de>
AuthorDate: Fri Aug 23 08:15:35 2019 +0200

    logstash adapter + test
---
 .../main/java/org/apache/plc4x/logstash/Plc4x.java | 57 +++++++++++++---------
 .../org/apache/plc4x/logstash/Plc4xInputTest.java  |  6 +--
 2 files changed, 37 insertions(+), 26 deletions(-)

diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java
index 91e2c13..601f8e6 100644
--- a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java
+++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java
@@ -20,7 +20,6 @@ package org.apache.plc4x.logstash;
 
 import co.elastic.logstash.api.*;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.scraper.ResultHandler;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
 import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
@@ -30,16 +29,15 @@ import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.T
 import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
 import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
+import java.util.logging.Logger;
 
 // class name must match plugin name
 @LogstashPlugin(name="plc4x")
 public class Plc4x implements Input {
+    static Logger logger = Logger.getLogger(Plc4x.class.getName());
 
     public static final PluginConfigSpec<Map<String, Object>> JOB_CONFIG =
             PluginConfigSpec.hashSetting("jobs");
@@ -80,8 +78,13 @@ public class Plc4x implements Input {
         //TODO: use multiple sources:
 
         for (String sourceName : sources.keySet()) {
-            //TODO: check !
-            builder.addSource(sourceName, ((String) sources.get(sourceName)));
+            Object o = sources.get(sourceName);
+            if(o instanceof String) {
+                String source = (String)o;
+                builder.addSource(sourceName, source);
+            } else {
+                logger.severe("URL of source " + sourceName + "has the wrong typ!");
+            }
         }
 
         for (String jobName : jobs.keySet()) {
@@ -93,21 +96,16 @@ public class Plc4x implements Input {
                 for (String source : ((List<String>) job.get("sources"))) {
                     jobBuilder.source(source);
                 }
-                for (String query : ((List<String>) job.get("queries"))) {
-                    String[] fieldSegments = query.split("=");
-                    if (fieldSegments.length != 2) {
-                        System.err.println(String.format("Error in job configuration '%s'. " +
-                                "The field segment expects a format {field-alias}={field-address}, but got '%s'",
-                            jobName, query));
-                        continue;
-                    }
-                    String fieldAlias = fieldSegments[0];
-                    String fieldAddress = fieldSegments[1];
+                Map<String, Object> queries = (Map<String, Object>) job.get("queries");
+                for (String queryName : queries.keySet()) {
+
+                    String fieldAlias = queryName;
+                    String fieldAddress = (String) queries.get(queryName);
                     jobBuilder.field(fieldAlias, fieldAddress);
                 }
                 jobBuilder.build();
             } else {
-                System.err.println("Jobs of wrong Type!");
+                logger.severe("Jobs of wrong Type!");
             }
         }
 
@@ -115,17 +113,30 @@ public class Plc4x implements Input {
         try {
             plcDriverManager = new PooledPlcDriverManager();
             triggerCollector = new TriggerCollectorImpl(plcDriverManager);
-            scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() {
-                @Override
-                public void handle(String jobName, String sourceName, Map<String, Object> results) {
-                    //TODO: use jobname etc for multiple connections
+            scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> {
+
+                //TODO: use jobname etc for multiple connections
+                for (Map.Entry<String, Object> result : results.entrySet()) {
+                    // Get field-name and -value from the results.
+                    String fieldName = result.getKey();
+                    Object fieldValue = result.getValue();
+
+                    System.out.println("fieldName: " + fieldName);
+                    System.out.println("fieldValue: " + fieldValue);
                     consumer.accept(results);
                 }
             }, triggerCollector);
             scraper.start();
             triggerCollector.start();
         } catch (ScraperException e) {
-            System.err.println("Error starting the scraper: "+ e);
+            logger.severe("Error starting the scraper: "+ e);
+        }
+        while(true) {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
         }
     }
 
diff --git a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
index 024a43b..9cf3b44 100644
--- a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
+++ b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
@@ -33,14 +33,15 @@ public class Plc4xInputTest {
         Map<String, Object> configValues = new HashMap<>();
         Map<String, Object> jobValues = new HashMap<>();
 
-        List<String> queries = Arrays.asList("testfield=RANDOM/foo:INTEGER");
+        Map<String,Object> queries = new HashMap<>();
+        queries.put("testfield", "ns=2;i=4");
         List<String> sources = Arrays.asList("TestConnection");
 
         jobValues.put("rate", 300);
         jobValues.put("queries", queries);
         jobValues.put("sources", sources);
 
-        configValues.put(Plc4x.SOURCE_CONFIG.name(), Maps.newHashMap("TestConnection", "test:hurzpurzfurz"));
+        configValues.put(Plc4x.SOURCE_CONFIG.name(), Maps.newHashMap("TestConnection", "opcua:tcp://localhost:4840/freeopcua/server/"));
         configValues.put(Plc4x.JOB_CONFIG.name(),  Maps.newHashMap("job1", jobValues));
 
 
@@ -68,5 +69,4 @@ public class Plc4xInputTest {
             return events;
         }
     }
-
 }