You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by tm...@apache.org on 2019/08/11 19:56:51 UTC
[plc4x] 01/01: PLC4X-90: scraper Refactoring *added
TriggerCollector (collect and execute all triggerRequests) *added date
datatypes (DateAndTime,
Date and TimeOfDay) (PLC4X-104) *trigger comparison extended to previous
value instead of only static value (PLC4X-109) *fixed bug for strings
>127chars (PLC4X-141) *improved functionality *fixed several minor bugs
*api modification for scraper usage *renaming
This is an automated email from the ASF dual-hosted git repository.
tmitsch pushed a commit to branch feature/improve-scraper-tim
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 8a4474bb6f802c51d4fe62fa210334f7c8c401a4
Author: julian <j....@pragmaticminds.de>
AuthorDate: Wed May 8 11:07:03 2019 +0200
PLC4X-90: scraper Refactoring
*added TriggerCollector (collect and execute all triggerRequests)
*added date datatypes (DateAndTime,Date and TimeOfDay) (PLC4X-104)
*trigger comparison extended to previous value instead of only static value (PLC4X-109)
*fixed bug for strings >127chars (PLC4X-141)
*improved functionality
*fixed several minor bugs
*api modification for scraper usage
*renaming
---
.../java/org/apache/plc4x/Plc4XBaseTableTest.java | 6 +-
.../org/apache/plc4x/java/s7/model/S7Field.java | 45 +-
.../plc4x/java/s7/netty/Plc4XNettyException.java | 37 +
.../plc4x/java/s7/netty/Plc4XS7Protocol.java | 277 +++++---
.../org/apache/plc4x/java/s7/netty/S7Protocol.java | 45 +-
.../java/s7/netty/model/types/TransportSize.java | 7 +-
.../strategies/DefaultS7MessageProcessor.java | 1 +
plc4j/utils/scraper/README.md | 21 +
.../org/apache/plc4x/java/scraper/ScrapeJob.java | 7 +-
.../apache/plc4x/java/scraper/ScrapeJobImpl.java | 7 +-
.../org/apache/plc4x/java/scraper/Scraper.java | 2 +-
.../apache/plc4x/java/scraper/ScraperTaskImpl.java | 47 +-
.../java/scraper/config/JobConfiguration.java | 7 +-
.../config/JobConfigurationClassicImpl.java | 49 ++
...ava => JobConfigurationClassicImplBuilder.java} | 20 +-
.../java/scraper/config/JobConfigurationImpl.java | 35 +-
.../java/scraper/config/ScraperConfiguration.java | 97 +--
.../config/ScraperConfigurationClassicImpl.java | 86 +++
...=> ScraperConfigurationClassicImplBuilder.java} | 16 +-
.../JobConfigurationTriggeredImpl.java | 47 ++
...a => JobConfigurationTriggeredImplBuilder.java} | 17 +-
...java => ScraperConfigurationTriggeredImpl.java} | 100 +--
... ScraperConfigurationTriggeredImplBuilder.java} | 18 +-
.../TriggeredJobConfiguration.java | 74 --
.../triggeredscraper/TriggeredScrapeJobImpl.java | 3 +-
.../triggeredscraper/TriggeredScraperImpl.java | 298 +++++++--
.../TriggeredScraperMBean.java} | 15 +-
.../triggeredscraper/TriggeredScraperTask.java | 145 ++--
.../TriggeredScraperTaskMBean.java} | 19 +-
.../triggerhandler/TriggerConfiguration.java | 744 +++++++++++++++------
.../triggerhandler/TriggerHandler.java | 112 +---
.../triggerhandler/TriggerHandlerImpl.java | 113 ++++
.../triggerhandler/collector/TriggerCollector.java | 62 ++
.../collector/TriggerCollectorImpl.java | 323 +++++++++
.../java/scraper/ScraperConfigurationTest.java | 21 +-
.../apache/plc4x/java/scraper/ScraperRunner.java | 3 +-
.../plc4x/java/scraper/TriggeredScraperRunner.java | 16 +-
.../java/scraper/TriggeredScraperRunnerModbus.java | 17 +-
.../config/ScraperConfigurationBuilderTest.java | 12 +-
.../triggeredscraper/TriggeredScraperImplTest.java | 118 ++++
.../triggerhandler/TriggerConfigurationTest.java | 65 +-
.../test/resources/example_triggered_scraper.yml | 40 +-
.../scraper/src/test/resources/logback-test.xml | 5 +-
...iggered_scraper.yml => mock-scraper-config.yml} | 33 +-
44 files changed, 2305 insertions(+), 927 deletions(-)
diff --git a/plc4j/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java b/plc4j/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java
index 3bc3007..b031393 100644
--- a/plc4j/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java
+++ b/plc4j/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java
@@ -19,6 +19,7 @@ under the License.
package org.apache.plc4x;
import org.apache.calcite.linq4j.Enumerator;
+import org.apache.plc4x.java.scraper.config.JobConfigurationClassicImpl;
import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
import org.assertj.core.api.WithAssertions;
import org.junit.jupiter.api.Test;
@@ -33,7 +34,10 @@ class Plc4XBaseTableTest implements WithAssertions {
@Test
void testOnBlockingQueue() {
ArrayBlockingQueue<Plc4xSchema.Record> queue = new ArrayBlockingQueue<>(100);
- Plc4xStreamTable table = new Plc4xStreamTable(queue, new JobConfigurationImpl("job1", 100,
+ Plc4xStreamTable table = new Plc4xStreamTable(queue, new JobConfigurationImpl(
+ "job1",
+ null,
+ 100,
Collections.emptyList(),
Collections.singletonMap("key", "address")));
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/model/S7Field.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/model/S7Field.java
index 7bced35..7f93a1e 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/model/S7Field.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/model/S7Field.java
@@ -24,6 +24,9 @@ import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.s7.netty.model.types.MemoryArea;
import org.apache.plc4x.java.s7.netty.model.types.TransportSize;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -115,6 +118,12 @@ public class S7Field implements PlcField {
case REAL:
case LREAL:
return Double.class;
+ case DATE_AND_TIME:
+ return LocalDateTime.class;
+ case DATE:
+ return LocalDate.class;
+ case TIME_OF_DAY:
+ return LocalTime.class;
default:
throw new NotImplementedException("The response type for datatype " + dataType + " is not yet implemented");
}
@@ -141,7 +150,7 @@ public class S7Field implements PlcField {
if(matcher.group(NUM_ELEMENTS) != null) {
numElements = Integer.parseInt(matcher.group(NUM_ELEMENTS));
}
- numElements = calcNumberOfElementsForStringTypes(numElements,dataType);
+ numElements = calcNumberOfElementsForIndividualTypes(numElements,dataType);
if(!transferSizeCode.isEmpty() && !dataType.getSizeCode().equals(transferSizeCode)) {
throw new PlcInvalidFieldException("Transfer size code '" + transferSizeCode +
"' doesn't match specified data type '" + dataType.name() + "'");
@@ -166,7 +175,7 @@ public class S7Field implements PlcField {
if(matcher.group(NUM_ELEMENTS) != null) {
numElements = Integer.parseInt(matcher.group(NUM_ELEMENTS));
}
- numElements = calcNumberOfElementsForStringTypes(numElements,dataType);
+ numElements = calcNumberOfElementsForIndividualTypes(numElements,dataType);
if(!transferSizeCode.isEmpty() && !dataType.getSizeCode().equals(transferSizeCode)) {
throw new PlcInvalidFieldException("Transfer size code '" + transferSizeCode +
"' doesn't match specified data type '" + dataType.name() + "'");
@@ -209,17 +218,29 @@ public class S7Field implements PlcField {
* @param dataType detected Transport-Size that represents the data-type
* @return corrected numElements if nessesary
*/
- private static int calcNumberOfElementsForStringTypes(int numElements,TransportSize dataType){
- //if no String nothing has to be done
- if(!dataType.equals(TransportSize.STRING)){
- return numElements;
- }
- //on valid String-length add two byte because of S7-representation of Strings
- if(numElements>1 && numElements<=254){
- return numElements+2;
+ private static int calcNumberOfElementsForIndividualTypes(int numElements, TransportSize dataType){
+
+ if(dataType.equals(TransportSize.STRING)){
+ //on valid String-length add two byte because of S7-representation of Strings
+ if(numElements>1 && numElements<=254){
+ return numElements+2;
+ }
+ //connection String usage with "STRING" only --> numElements=1 --> enter default value
+ return 256;
}
- //connection String usage with "STRING" only --> numElements=1 --> enter default value
- return 256;
+ return numElements;
+
}
+ @Override
+ public String toString() {
+ return "S7Field{" +
+ "dataType=" + dataType +
+ ", memoryArea=" + memoryArea +
+ ", blockNumber=" + blockNumber +
+ ", byteOffset=" + byteOffset +
+ ", bitOffset=" + bitOffset +
+ ", numElements=" + numElements +
+ '}';
+ }
}
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XNettyException.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XNettyException.java
new file mode 100644
index 0000000..4b370d3
--- /dev/null
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XNettyException.java
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.plc4x.java.s7.netty;
+
+/**
+ * when exceptions within netty-implementations are thrown this class will be used
+ */
+public class Plc4XNettyException extends RuntimeException{
+
+ /**
+ * Constructs a new runtime exception with the specified detail message.
+ * The cause is not initialized, and may subsequently be initialized by a
+ * call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public Plc4XNettyException(String message) {
+ super(message);
+ }
+}
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XS7Protocol.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XS7Protocol.java
index d64ed82..6c789dd 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XS7Protocol.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/Plc4XS7Protocol.java
@@ -45,13 +45,20 @@ import org.apache.plc4x.java.s7.netty.model.params.items.VarParameterItem;
import org.apache.plc4x.java.s7.netty.model.payloads.VarPayload;
import org.apache.plc4x.java.s7.netty.model.payloads.items.VarPayloadItem;
import org.apache.plc4x.java.s7.netty.model.types.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -67,6 +74,7 @@ import java.util.stream.IntStream;
* the {@link PlcRequestContainer}s future with the {@link PlcResponse}.
*/
public class Plc4XS7Protocol extends PlcMessageToMessageCodec<S7Message, PlcRequestContainer> {
+ private static final Logger logger = LoggerFactory.getLogger( Plc4XS7Protocol.class );
private static final AtomicInteger tpduGenerator = new AtomicInteger(10);
@@ -445,87 +453,105 @@ public class Plc4XS7Protocol extends PlcMessageToMessageCodec<S7Message, PlcRequ
BaseDefaultFieldItem fieldItem = null;
ByteBuf data = Unpooled.wrappedBuffer(payloadItem.getData());
if (responseCode == PlcResponseCode.OK) {
- // TODO 2018-09-27 jf: array returning only implemented for BOOL, BYTE, INTEGERS, FP
- // not for CHARS & STRINGS and not for all other bit-strings except for BYTE
- switch (field.getDataType()) {
- // -----------------------------------------
- // Bit
- // -----------------------------------------
- case BOOL:
- fieldItem = decodeReadResponseBitField(field, data);
- break;
- // -----------------------------------------
- // Bit-strings
- // -----------------------------------------
- case BYTE: // 1 byte
- fieldItem = decodeReadResponseByteBitStringField(field, data);
- break;
- case WORD: // 2 byte (16 bit)
- fieldItem = decodeReadResponseShortBitStringField(field, data);
- break;
- case DWORD: // 4 byte (32 bit)
- fieldItem = decodeReadResponseIntegerBitStringField(field, data);
- break;
- case LWORD: // 8 byte (64 bit)
- fieldItem = decodeReadResponseLongBitStringField(field, data);
- break;
- // -----------------------------------------
- // Integers
- // -----------------------------------------
- // 8 bit:
- case SINT:
- fieldItem = decodeReadResponseSignedByteField(field, data);
- break;
- case USINT:
- fieldItem = decodeReadResponseUnsignedByteField(field, data);
- break;
- // 16 bit:
- case INT:
- fieldItem = decodeReadResponseSignedShortField(field, data);
- break;
- case UINT:
- fieldItem = decodeReadResponseUnsignedShortField(field, data);
- break;
- // 32 bit:
- case DINT:
- fieldItem = decodeReadResponseSignedIntegerField(field, data);
- break;
- case UDINT:
- fieldItem = decodeReadResponseUnsignedIntegerField(field, data);
- break;
- // 64 bit:
- case LINT:
- fieldItem = decodeReadResponseSignedLongField(field, data);
- break;
- case ULINT:
- fieldItem = decodeReadResponseUnsignedLongField(field, data);
- break;
- // -----------------------------------------
- // Floating point values
- // -----------------------------------------
- case REAL:
- fieldItem = decodeReadResponseFloatField(field, data);
- break;
- case LREAL:
- fieldItem = decodeReadResponseDoubleField(field, data);
- break;
- // -----------------------------------------
- // Characters & Strings
- // -----------------------------------------
- case CHAR: // 1 byte (8 bit)
- fieldItem = decodeReadResponseFixedLengthStringField(1, false, data);
- break;
- case WCHAR: // 2 byte
- fieldItem = decodeReadResponseFixedLengthStringField(1, true, data);
- break;
- case STRING:
- fieldItem = decodeReadResponseVarLengthStringField(false, data);
- break;
- case WSTRING:
- fieldItem = decodeReadResponseVarLengthStringField(true, data);
- break;
- default:
- throw new PlcProtocolException("Unsupported type " + field.getDataType());
+ try {
+ switch (field.getDataType()) {
+ // -----------------------------------------
+ // Bit
+ // -----------------------------------------
+ case BOOL:
+ fieldItem = decodeReadResponseBitField(field, data);
+ break;
+ // -----------------------------------------
+ // Bit-strings
+ // -----------------------------------------
+ case BYTE: // 1 byte
+ fieldItem = decodeReadResponseByteBitStringField(field, data);
+ break;
+ case WORD: // 2 byte (16 bit)
+ fieldItem = decodeReadResponseShortBitStringField(field, data);
+ break;
+ case DWORD: // 4 byte (32 bit)
+ fieldItem = decodeReadResponseIntegerBitStringField(field, data);
+ break;
+ case LWORD: // 8 byte (64 bit)
+ fieldItem = decodeReadResponseLongBitStringField(field, data);
+ break;
+ // -----------------------------------------
+ // Integers
+ // -----------------------------------------
+ // 8 bit:
+ case SINT:
+ fieldItem = decodeReadResponseSignedByteField(field, data);
+ break;
+ case USINT:
+ fieldItem = decodeReadResponseUnsignedByteField(field, data);
+ break;
+ // 16 bit:
+ case INT:
+ fieldItem = decodeReadResponseSignedShortField(field, data);
+ break;
+ case UINT:
+ fieldItem = decodeReadResponseUnsignedShortField(field, data);
+ break;
+ // 32 bit:
+ case DINT:
+ fieldItem = decodeReadResponseSignedIntegerField(field, data);
+ break;
+ case UDINT:
+ fieldItem = decodeReadResponseUnsignedIntegerField(field, data);
+ break;
+ // 64 bit:
+ case LINT:
+ fieldItem = decodeReadResponseSignedLongField(field, data);
+ break;
+ case ULINT:
+ fieldItem = decodeReadResponseUnsignedLongField(field, data);
+ break;
+ // -----------------------------------------
+ // Floating point values
+ // -----------------------------------------
+ case REAL:
+ fieldItem = decodeReadResponseFloatField(field, data);
+ break;
+ case LREAL:
+ fieldItem = decodeReadResponseDoubleField(field, data);
+ break;
+ // -----------------------------------------
+ // Characters & Strings
+ // -----------------------------------------
+ case CHAR: // 1 byte (8 bit)
+ fieldItem = decodeReadResponseFixedLengthStringField(1, false, data);
+ break;
+ case WCHAR: // 2 byte
+ fieldItem = decodeReadResponseFixedLengthStringField(1, true, data);
+ break;
+ case STRING:
+ fieldItem = decodeReadResponseVarLengthStringField(false, data);
+ break;
+ case WSTRING:
+ fieldItem = decodeReadResponseVarLengthStringField(true, data);
+ break;
+ // -----------------------------------------
+ // TIA Date-Formats
+ // -----------------------------------------
+ case DATE_AND_TIME:
+ fieldItem = decodeReadResponseDateAndTime(field, data);
+ break;
+ case TIME_OF_DAY:
+ fieldItem = decodeReadResponseTimeOfDay(field, data);
+ break;
+ case DATE:
+ fieldItem = decodeReadResponseDate(field, data);
+ break;
+ default:
+ throw new PlcProtocolException("Unsupported type " + field.getDataType());
+ }
+ }
+ catch (Plc4XNettyException e){
+ logger.warn("Problem during casting of field {}: Exception: {}; FieldInformation: {}",fieldName,e.getMessage(),field);
+ }
+ catch (Exception e){
+ logger.warn("Some other error occurred casting field {}, FieldInformation: {}",fieldName, field,e);
}
}
Pair<PlcResponseCode, BaseDefaultFieldItem> result = new ImmutablePair<>(responseCode, fieldItem);
@@ -636,8 +662,39 @@ public class Plc4XS7Protocol extends PlcMessageToMessageCodec<S7Message, PlcRequ
BaseDefaultFieldItem decodeReadResponseVarLengthStringField(boolean isUtf16, ByteBuf data) {
// Max length ... ignored.
data.skipBytes(1);
- byte actualLength = data.readByte();
- return decodeReadResponseFixedLengthStringField(actualLength, isUtf16, data);
+
+ //reading out byte and transforming that to an unsigned byte within an integer, otherwise longer strings are failing
+ byte currentLengthByte = data.readByte();
+ int currentLength = currentLengthByte & 0xFF;
+ return decodeReadResponseFixedLengthStringField(currentLength, isUtf16, data);
+ }
+
+ BaseDefaultFieldItem decodeReadResponseDateAndTime(S7Field field,ByteBuf data) {
+ LocalDateTime[] localDateTimes = readAllValues(LocalDateTime.class,field, i -> readDateAndTime(data));
+ return new DefaultLocalDateTimeFieldItem(localDateTimes);
+ }
+
+ BaseDefaultFieldItem decodeReadResponseTimeOfDay(S7Field field,ByteBuf data) {
+ LocalTime[] localTimes = readAllValues(LocalTime.class,field, i -> readTimeOfDay(data));
+ return new DefaultLocalTimeFieldItem(localTimes);
+ }
+
+ BaseDefaultFieldItem decodeReadResponseDate(S7Field field,ByteBuf data) {
+ LocalDate[] localTimes = readAllValues(LocalDate.class,field, i -> readDate(data));
+ return new DefaultLocalDateFieldItem(localTimes);
+ }
+
+ // Returns a 32 bit unsigned value : from 0 to 4294967295 (2^32-1)
+ public static int getUDIntAt(byte[] buffer, int pos) {
+ int result;
+ result = buffer[pos] & 0x0FF;
+ result <<= 8;
+ result += buffer[pos + 1] & 0x0FF;
+ result <<= 8;
+ result += buffer[pos + 2] & 0x0FF;
+ result <<= 8;
+ result += buffer[pos + 3] & 0x0FF;
+ return result;
}
private static <T> T[] readAllValues(Class<T> clazz, S7Field field, Function<Integer, T> extract) {
@@ -730,4 +787,58 @@ public class Plc4XS7Protocol extends PlcMessageToMessageCodec<S7Message, PlcRequ
return new BigInteger(bytes);
}
+ LocalDateTime readDateAndTime(ByteBuf data) {
+ //per definition for Date_And_Time only the first 6 bytes are used
+
+ int year=convertByteToBcd(data.readByte());
+ int month=convertByteToBcd(data.readByte());
+ int day=convertByteToBcd(data.readByte());
+ int hour=convertByteToBcd(data.readByte());
+ int minute=convertByteToBcd(data.readByte());
+ int second=convertByteToBcd(data.readByte());
+ //skip the last 2 bytes no information present
+ data.readByte();
+ data.readByte();
+
+ //data-type ranges from 1990 up to 2089
+ if(year>=90){
+ year+=1900;
+ }
+ else{
+ year+=2000;
+ }
+
+ return LocalDateTime.of(year,month,day,hour,minute,second);
+ }
+
+ LocalTime readTimeOfDay(ByteBuf data) {
+ //per definition for Date_And_Time only the first 6 bytes are used
+
+ int millisSinsMidnight = data.readInt();
+
+
+ return LocalTime.now().withHour(0).withMinute(0).withSecond(0).withNano(0).plus(millisSinsMidnight, ChronoUnit.MILLIS);
+
+ }
+
+ LocalDate readDate(ByteBuf data) {
+ //per definition for Date_And_Time only the first 6 bytes are used
+
+ int daysSince1990 = data.readUnsignedShort();
+
+ System.out.println(daysSince1990);
+ return LocalDate.now().withYear(1990).withDayOfMonth(1).withMonth(1).plus(daysSince1990, ChronoUnit.DAYS);
+
+ }
+
+ /**
+ * converts incoming byte to an integer regarding used BCD format
+ * @param incomingByte
+ * @return converted BCD number
+ */
+ private static int convertByteToBcd(byte incomingByte) {
+ int dec = (incomingByte >> 4) * 10;
+ return dec + (incomingByte & 0x0f);
+ }
+
}
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
index 17214ca..6bc19b0 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/S7Protocol.java
@@ -393,7 +393,8 @@ public class S7Protocol extends ChannelDuplexHandler {
buf.writeByte((byte) 0x0a);
buf.writeByte(s7AnyRequestItem.getAddressingMode().getCode());
buf.writeByte(s7AnyRequestItem.getDataType().getTypeCode());
- buf.writeShort(s7AnyRequestItem.getNumElements());
+
+ buf.writeShort(encodeNumElements(s7AnyRequestItem));
buf.writeShort(s7AnyRequestItem.getDataBlockNumber());
buf.writeByte(s7AnyRequestItem.getMemoryArea().getCode());
// A S7 address is 3 bytes long. Unfortunately the byte-offset is NOT located in
@@ -407,6 +408,22 @@ public class S7Protocol extends ChannelDuplexHandler {
| (s7AnyRequestItem.getBitOffset() & 0x07)));
}
+ /**
+ * this is a workaround for the date and time types, as native requests with the datatypes are
+ * @return
+ */
+ private short encodeNumElements(S7AnyVarParameterItem s7AnyVarParameterItem){
+ switch (s7AnyVarParameterItem.getDataType()){
+ case DATE_AND_TIME:
+ case TIME_OF_DAY:
+ case DATE:
+ return (short) (s7AnyVarParameterItem.getNumElements()*s7AnyVarParameterItem.getDataType().getSizeInBytes());
+ default:
+ return (short) s7AnyVarParameterItem.getNumElements();
+ }
+
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Decoding
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -528,8 +545,10 @@ public class S7Protocol extends ChannelDuplexHandler {
maxAmqCallee = setupCommunicationParameter.getMaxAmqCallee();
pduSize = setupCommunicationParameter.getPduLength();
- logger.info("S7Connection established pdu-size {}, max-amq-caller {}, " +
+ if(logger.isInfoEnabled()) {
+ logger.info("S7Connection established pdu-size {}, max-amq-caller {}, " +
"max-amq-callee {}", pduSize, maxAmqCaller, maxAmqCallee);
+ }
// Only if the controller type is set to "ANY", then try to identify the PLC type.
if(controllerType == S7ControllerType.ANY) {
@@ -544,11 +563,14 @@ public class S7Protocol extends ChannelDuplexHandler {
}
// If a concrete type was specified, then we're done here.
else {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Successfully connected to S7: {}", controllerType.name());
+ logger.debug("- max amq caller: {}", maxAmqCaller);
+ logger.debug("- max amq callee: {}", maxAmqCallee);
+ logger.debug("- pdu size: {}", pduSize);
+ }
if(logger.isInfoEnabled()) {
- logger.info(String.format("Successfully connected to S7: %s", controllerType.name()));
- logger.info(String.format("- max amq caller: %s", maxAmqCaller));
- logger.info(String.format("- max amq callee: %s", maxAmqCallee));
- logger.info(String.format("- pdu size: %s", pduSize));
+ logger.info("Successfully connected to S7: {} wit PDU {}", controllerType.name(),pduSize);
}
// Send an event that connection setup is complete.
@@ -567,11 +589,14 @@ public class S7Protocol extends ChannelDuplexHandler {
}
}
}
+ if(logger.isDebugEnabled()) {
+ logger.debug("Successfully connected to S7: {}", controllerType.name());
+ logger.debug("- max amq caller: {}", maxAmqCaller);
+ logger.debug("- max amq callee: {}", maxAmqCallee);
+ logger.debug("- pdu size: {}", pduSize);
+ }
if(logger.isInfoEnabled()) {
- logger.info(String.format("Successfully connected to S7: %s", controllerType.name()));
- logger.info(String.format("- max amq caller: %s", maxAmqCaller));
- logger.info(String.format("- max amq callee: %s", maxAmqCallee));
- logger.info(String.format("- pdu size: %s", pduSize));
+ logger.info("Successfully connected to S7: {} wit PDU {}", controllerType.name(),pduSize);
}
// Send an event that connection setup is complete.
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/TransportSize.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/TransportSize.java
index 98f35d6..323d0a1 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/TransportSize.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/model/types/TransportSize.java
@@ -83,19 +83,18 @@ public enum TransportSize {
// Date
// -----------------------------------------
// IEC date (yyyy-m-d)
- // TODO: Find the code
- DATE(0x00, "X", 2, null, null, S7ControllerType.ANY),
+ DATE(0x02, "X", 2, null, DataTransportSize.BYTE_WORD_DWORD, S7ControllerType.ANY),
// -----------------------------------------
// Time of day
// -----------------------------------------
// Time (hh:mm:ss.S)
- TIME_OF_DAY(0x0A, "X", 4, null, null, S7ControllerType.ANY),
+ TIME_OF_DAY(0x02, "X", 4, null, DataTransportSize.BYTE_WORD_DWORD,S7ControllerType.ANY),
// -----------------------------------------
// Date and time of day
// -----------------------------------------
- DATE_AND_TIME(0x0F, "X", 8, null, null, S7ControllerType.ANY),
+ DATE_AND_TIME(0x02, "X", 8,null, null, S7ControllerType.S7_1500, S7ControllerType.S7_300, S7ControllerType.S7_400),
// -----------------------------------------
// ASCII Strings
diff --git a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java
index a89bd93..cbb3243 100644
--- a/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java
+++ b/plc4j/protocols/s7/src/main/java/org/apache/plc4x/java/s7/netty/strategies/DefaultS7MessageProcessor.java
@@ -405,6 +405,7 @@ public class DefaultS7MessageProcessor implements S7MessageProcessor {
if(requestItem.getNumElements() != responseParameterItem.getNumElements()) {
int itemSizeInBytes = requestItem.getDataType().getSizeInBytes();
int totalSizeInBytes = requestItem.getNumElements() * itemSizeInBytes;
+
if(varParameter.getType() == ParameterType.READ_VAR) {
byte[] data = new byte[totalSizeInBytes];
System.arraycopy(responsePayloadItem.getData(), 0, data, 0, responsePayloadItem.getData().length);
diff --git a/plc4j/utils/scraper/README.md b/plc4j/utils/scraper/README.md
new file mode 100644
index 0000000..1c75f33
--- /dev/null
+++ b/plc4j/utils/scraper/README.md
@@ -0,0 +1,21 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+# Usage of (Triggered) Scraper
+
+ToDo: Write this!
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScrapeJob.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScrapeJob.java
index dd46f5f..6cde8d6 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScrapeJob.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScrapeJob.java
@@ -19,12 +19,15 @@
package org.apache.plc4x.java.scraper;
+import org.apache.plc4x.java.scraper.config.JobConfigurationClassicImpl;
+import org.apache.plc4x.java.scraper.config.ScraperConfigurationClassicImpl;
+
import java.util.Map;
/**
* POJO Object to transport all Job information.
- * Is generated from {@link org.apache.plc4x.java.scraper.config.ScraperConfiguration} by
- * merging the sources and the {@link org.apache.plc4x.java.scraper.config.JobConfigurationImpl}.
+ * Is generated from {@link ScraperConfigurationClassicImpl} by
+ * merging the sources and the {@link JobConfigurationClassicImpl}.
*/
public interface ScrapeJob {
String getJobName();
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScrapeJobImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScrapeJobImpl.java
index b541e47..9b0c181 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScrapeJobImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScrapeJobImpl.java
@@ -19,14 +19,15 @@
package org.apache.plc4x.java.scraper;
-import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+import org.apache.plc4x.java.scraper.config.JobConfigurationClassicImpl;
+import org.apache.plc4x.java.scraper.config.ScraperConfigurationClassicImpl;
import java.util.Map;
/**
* POJO Object to transport all Job information.
- * Is generated from {@link org.apache.plc4x.java.scraper.config.ScraperConfiguration} by
- * merging the sources and the {@link JobConfigurationImpl}.
+ * Is generated from {@link ScraperConfigurationClassicImpl} by
+ * merging the sources and the {@link JobConfigurationClassicImpl}.
*
* @deprecated Scraper is deprecated please use {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
*/
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
index cd9ab7f..0fd7a2a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/Scraper.java
@@ -22,7 +22,7 @@ package org.apache.plc4x.java.scraper;
/**
* Main interface that orchestrates scraping.
*/
-public interface Scraper {
+public interface Scraper{
/**
* Start the scraping.
*/
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTaskImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTaskImpl.java
index f15ee84..4c5a270 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTaskImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/ScraperTaskImpl.java
@@ -29,7 +29,7 @@ import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
-import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+import org.apache.plc4x.java.scraper.config.JobConfigurationClassicImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ import java.util.stream.Collectors;
/**
* Plc Scraper Task that scrapes one source.
* One {@link ScrapeJobImpl} gets split into multiple tasks.
- * One task for each source that is defined in the {@link JobConfigurationImpl}.
+ * One task for each source that is defined in the {@link JobConfigurationClassicImpl}.
*
* @deprecated Scraper is deprecated please use {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
*/
@@ -93,10 +93,13 @@ public class ScraperTaskImpl implements ScraperTask {
this.resultHandler = resultHandler;
}
+
@Override
public void run() {
// Does a single fetch
- LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+ if(LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+ }
requestCounter.incrementAndGet();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
@@ -111,15 +114,17 @@ public class ScraperTaskImpl implements ScraperTask {
}
}, handlerService);
connection = future.get(10*requestTimeoutMs, TimeUnit.MILLISECONDS);
- LOGGER.trace("Connection to {} established: {}", connectionString, connection);
- PlcReadResponse response;
+ LOGGER.debug("Connection to {} established: {}", connectionString, connection);
+ PlcReadResponse plcReadResponse;
try {
- PlcReadRequest.Builder builder = connection.readRequestBuilder();
- fields.forEach((alias,qry) -> {
+ //build read request
+ PlcReadRequest.Builder readRequestBuilder = connection.readRequestBuilder();
+ //add fields to be acquired to builder
+ fields.forEach((alias, qry) -> {
LOGGER.trace("Requesting: {} -> {}", alias, qry);
- builder.addItem(alias,qry);
+ readRequestBuilder.addItem(alias, qry);
});
- response = builder
+ plcReadResponse = readRequestBuilder
.build()
.execute()
.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
@@ -128,17 +133,20 @@ public class ScraperTaskImpl implements ScraperTask {
handleException(e);
return;
}
- // Add statistics
+
+ LOGGER.debug("Performing statistics");
+ // Add some statistics
stopWatch.stop();
latencyStatistics.addValue(stopWatch.getNanoTime());
failedStatistics.addValue(0.0);
successCounter.incrementAndGet();
// Validate response
- validateResponse(response);
+ validateResponse(plcReadResponse);
+
// Handle response (Async)
- CompletableFuture.runAsync(() -> resultHandler.handle(jobName, connectionAlias, transformResponseToMap(response)), handlerService);
+ CompletableFuture.runAsync(() -> resultHandler.handle(jobName, connectionAlias, transformResponseToMap(plcReadResponse)), handlerService);
} catch (Exception e) {
- LOGGER.debug("Exception during scrape", e);
+ LOGGER.warn("Exception during scraping of Job {}, Connection-Alias {}: Error-message: {} - for stack-trace change logging to DEBUG", jobName,connectionAlias,e.getMessage());
handleException(e);
} finally {
if (connection != null) {
@@ -151,6 +159,10 @@ public class ScraperTaskImpl implements ScraperTask {
}
}
+ /**
+ * validate read response due to failed fields
+ * @param response acquired response
+ */
private void validateResponse(PlcReadResponse response) {
Map<String, PlcResponseCode> failedFields = response.getFieldNames().stream()
.filter(name -> !PlcResponseCode.OK.equals(response.getResponseCode(name)))
@@ -163,6 +175,11 @@ public class ScraperTaskImpl implements ScraperTask {
}
}
+ /**
+ * transforms the read-response to a Map of String (Key) and Object(Value)
+ * @param response response from PLC
+ * @return transformed Map
+ */
private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
return response.getFieldNames().stream()
.collect(Collectors.toMap(
@@ -203,7 +220,9 @@ public class ScraperTaskImpl implements ScraperTask {
@Override
public void handleException(Exception e) {
- LOGGER.debug("Exception: ", e);
+ if(LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Detailed exception occurred at scraping", e);
+ }
failedStatistics.addValue(1.0);
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
index 98b5b79..9c84b0b 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
@@ -23,11 +23,16 @@ import java.util.List;
import java.util.Map;
/**
- * Created by timbo on 2019-03-05
+ * basic interface for the configuration of a scrape job
*/
public interface JobConfiguration {
String getName();
+ String getTriggerConfig();
+
+ @Deprecated
+ Integer getScrapeRate();
+
List<String> getSources();
Map<String, String> getFields();
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationClassicImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationClassicImpl.java
new file mode 100644
index 0000000..6688363
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationClassicImpl.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper.config;
+
+import org.apache.plc4x.java.scraper.ScrapeJobImpl;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Configuration for one {@link ScrapeJobImpl} in the @{@link ScraperConfigurationClassicImpl}.
+ *
+ * @deprecated Scraper is deprecated please use {@link JobConfigurationTriggeredImplBuilder} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
+ */
+@Deprecated
+public class JobConfigurationClassicImpl extends JobConfigurationImpl {
+
+
+ /**
+ * Default constructor
+ *
+ * @param name Job Name / identifier
+ * @param triggerConfig configuration string for triggered jobs
+ * @param scrapeRate
+ * @param sources source alias (<b>not</b> connection string but the alias (from @{@link ScraperConfigurationClassicImpl}).
+ * @param fields Map from field alias (how it is named in the result map) to plc4x field query
+ */
+ public JobConfigurationClassicImpl(String name, String triggerConfig, Integer scrapeRate, List<String> sources, Map<String, String> fields) {
+ super(name, triggerConfig, scrapeRate, sources, fields);
+ }
+}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationImplBuilder.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationClassicImplBuilder.java
similarity index 62%
rename from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationImplBuilder.java
rename to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationClassicImplBuilder.java
index 418d2fb..bbbcb2a 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationImplBuilder.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationClassicImplBuilder.java
@@ -19,45 +19,47 @@
package org.apache.plc4x.java.scraper.config;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImpl;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * @deprecated Scraper is deprecated please use {@link org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredJobConfiguration} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
+ * @deprecated Scraper is deprecated please use {@link JobConfigurationTriggeredImpl} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
*/
@Deprecated
-public class JobConfigurationImplBuilder {
+public class JobConfigurationClassicImplBuilder {
- private final ScraperConfigurationBuilder parent;
+ private final ScraperConfigurationClassicImplBuilder parent;
private final String name;
private final int scrapeRateMs;
private final List<String> sources = new ArrayList<>();
private final Map<String, String> fields = new HashMap<>();
- public JobConfigurationImplBuilder(ScraperConfigurationBuilder parent, String name, int scrapeRateMs) {
+ public JobConfigurationClassicImplBuilder(ScraperConfigurationClassicImplBuilder parent, String name, int scrapeRateMs) {
this.parent = parent;
this.name = name;
this.scrapeRateMs = scrapeRateMs;
}
- public JobConfigurationImplBuilder source(String alias) {
+ public JobConfigurationClassicImplBuilder source(String alias) {
this.sources.add(alias);
return this;
}
- public JobConfigurationImplBuilder field(String alias, String fieldQuery) {
+ public JobConfigurationClassicImplBuilder field(String alias, String fieldQuery) {
this.fields.put(alias, fieldQuery);
return this;
}
- private JobConfigurationImpl buildInternal() {
- return new JobConfigurationImpl(name, scrapeRateMs, sources, fields);
+ private JobConfigurationClassicImpl buildInternal() {
+ return new JobConfigurationClassicImpl(name,null, scrapeRateMs, sources, fields);
}
- public ScraperConfigurationBuilder build() {
+ public ScraperConfigurationClassicImplBuilder build() {
parent.addJobConfiguration(this.buildInternal());
return this.parent;
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationImpl.java
index 044a93b..81438b9 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfigurationImpl.java
@@ -21,37 +21,36 @@ package org.apache.plc4x.java.scraper.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.plc4x.java.scraper.ScrapeJobImpl;
+import org.apache.plc4x.java.api.exceptions.PlcNotImplementedException;
import java.util.List;
import java.util.Map;
/**
- * Configuration for one {@link ScrapeJobImpl} in the @{@link ScraperConfiguration}.
- *
- * @deprecated Scraper is deprecated please use {@link org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredJobConfigurationBuilder} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
+ * abstract configuration for scrape-job configuration
*/
-@Deprecated
public class JobConfigurationImpl implements JobConfiguration {
-
- private final String name;
- private final int scrapeRate;
- private final List<String> sources;
- private final Map<String, String> fields;
+ protected final String name;
+ protected final String triggerConfig;
+ protected final Integer scrapeRate;
+ protected final List<String> sources;
+ protected final Map<String, String> fields;
/**
* Default constructor
* @param name Job Name / identifier
- * @param scrapeRate Scrape rate in ms
- * @param sources source alias (<b>not</b> connection string but the alias (from @{@link ScraperConfiguration}).
+ * @param triggerConfig configuration string for triggered jobs
+ * @param sources source alias (<b>not</b> connection string but the alias (from @{@link ScraperConfigurationClassicImpl}).
* @param fields Map from field alias (how it is named in the result map) to plc4x field query
*/
@JsonCreator
public JobConfigurationImpl(@JsonProperty(value = "name", required = true) String name,
- @JsonProperty(value = "scrapeRate", required = true) int scrapeRate,
+ @JsonProperty(value = "triggerConfig") String triggerConfig,
+ @JsonProperty(value = "scrapeRate") Integer scrapeRate,
@JsonProperty(value = "sources", required = true) List<String> sources,
@JsonProperty(value = "fields", required = true) Map<String, String> fields) {
this.name = name;
+ this.triggerConfig = triggerConfig;
this.scrapeRate = scrapeRate;
this.sources = sources;
this.fields = fields;
@@ -62,8 +61,9 @@ public class JobConfigurationImpl implements JobConfiguration {
return name;
}
- public int getScrapeRate() {
- return scrapeRate;
+ @Override
+ public String getTriggerConfig() {
+ return triggerConfig;
}
@Override
@@ -75,4 +75,9 @@ public class JobConfigurationImpl implements JobConfiguration {
public Map<String, String> getFields() {
return fields;
}
+
+ @Override
+ public Integer getScrapeRate() {
+ return scrapeRate;
+ }
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfiguration.java
index 9b0804b..c3cb2da 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfiguration.java
@@ -19,85 +19,43 @@
package org.apache.plc4x.java.scraper.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.plc4x.java.scraper.ScrapeJob;
-import org.apache.plc4x.java.scraper.ScrapeJobImpl;
import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException;
import org.apache.plc4x.java.scraper.exception.ScraperException;
-import org.apache.plc4x.java.scraper.ScraperImpl;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredJobConfiguration;
-import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
/**
- * Configuration class for {@link ScraperImpl}.
- *
- * @deprecated Scraper is deprecated please use {@link org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredScraperConfiguration} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
+ * interface for basic configuration of scraper
*/
-@Deprecated
-public class ScraperConfiguration {
-
- private final Map<String, String> sources;
- private final List<JobConfigurationImpl> jobConfigurations;
-
- /**
- * Default constructor.
- *
- * @param sources Map from connection alias to connection string
- * @param jobConfigurations List of configurations one for each Job
- */
- @JsonCreator
- public ScraperConfiguration(@JsonProperty(value = "sources", required = true) Map<String, String> sources,
- @JsonProperty(value = "jobs", required = true) List<JobConfigurationImpl> jobConfigurations) {
- checkNoUnreferencedSources(sources, jobConfigurations);
- // TODO Warning on too many sources?!
- this.sources = sources;
- this.jobConfigurations = jobConfigurations;
- }
-
- private void checkNoUnreferencedSources(Map<String, String> sources, List<JobConfigurationImpl> jobConfigurations) {
- Set<String> unreferencedSources = jobConfigurations.stream()
- .flatMap(job -> job.getSources().stream())
- .filter(source -> !sources.containsKey(source))
- .collect(Collectors.toSet());
- if (!unreferencedSources.isEmpty()) {
- throw new ScraperConfigurationException("There are the following unreferenced sources: " + unreferencedSources);
- }
- }
-
- public static ScraperConfiguration fromYaml(String yaml) {
+public interface ScraperConfiguration {
+ static <T>T fromYaml(String yaml, Class<T> clazz) {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
try {
- return mapper.readValue(yaml, ScraperConfiguration.class);
+ return mapper.readValue(yaml, clazz);
} catch (IOException e) {
throw new ScraperConfigurationException("Unable to parse given yaml configuration!", e);
}
}
- public static ScraperConfiguration fromJson(String json) {
+ static <T>T fromJson(String json, Class<T> clazz) {
ObjectMapper mapper = new ObjectMapper(new JsonFactory());
try {
- return mapper.readValue(json, ScraperConfiguration.class);
+ return mapper.readValue(json, clazz);
} catch (IOException e) {
throw new ScraperConfigurationException("Unable to parse given json configuration!", e);
}
}
- public static ScraperConfiguration fromFile(String path) throws IOException {
+ static <T>T fromFile(String path, Class<T> clazz) throws IOException {
ObjectMapper mapper;
if (path.endsWith("json")) {
mapper = new ObjectMapper(new JsonFactory());
@@ -107,7 +65,7 @@ public class ScraperConfiguration {
throw new ScraperConfigurationException("Only files with extensions json, yml or yaml can be read");
}
try {
- return mapper.readValue(new File(path), ScraperConfiguration.class);
+ return mapper.readValue(new File(path), clazz);
} catch (FileNotFoundException e) {
throw new ScraperConfigurationException("Unable to find configuration given configuration file at '" + path + "'", e);
} catch (MismatchedInputException e) {
@@ -115,42 +73,9 @@ public class ScraperConfiguration {
}
}
- public Map<String, String> getSources() {
- return sources;
- }
+ Map<String, String> getSources();
- public List<JobConfigurationImpl> getJobConfigurations() {
- return jobConfigurations;
- }
+ List<JobConfigurationImpl> getJobConfigurations();
- public List<ScrapeJob> getJobs() throws ScraperException {
- List<ScrapeJob> scrapeJobs = new ArrayList<>();
- for(JobConfiguration jobConfiguration:jobConfigurations){
- if(jobConfiguration instanceof JobConfigurationImpl){
- JobConfigurationImpl jobConfigurationImpl = (JobConfigurationImpl)jobConfiguration;
- scrapeJobs.add(new ScrapeJobImpl(jobConfiguration.getName(),
- jobConfigurationImpl.getScrapeRate(),
- getSourcesForAliases(jobConfiguration.getSources()),
- jobConfiguration.getFields()));
- }
- else{
- if(jobConfiguration instanceof TriggeredJobConfiguration){
- TriggeredJobConfiguration triggeredJobConfiguration = (TriggeredJobConfiguration) jobConfiguration;
- scrapeJobs.add(new TriggeredScrapeJobImpl(jobConfiguration.getName(),
- triggeredJobConfiguration.getTriggerConfig(),
- getSourcesForAliases(jobConfiguration.getSources()),
- jobConfiguration.getFields()));
- }
- }
- }
- return scrapeJobs;
- }
-
- private Map<String, String> getSourcesForAliases(List<String> aliases) {
- return aliases.stream()
- .collect(Collectors.toMap(
- Function.identity(),
- sources::get
- ));
- }
+ List<ScrapeJob> getJobs() throws ScraperException;
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationClassicImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationClassicImpl.java
new file mode 100644
index 0000000..aafa373
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationClassicImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.plc4x.java.scraper.ScrapeJob;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
+import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException;
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.ScraperImpl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Configuration class for {@link ScraperImpl}.
+ *
+ * @deprecated Scraper is deprecated please use {@link ScraperConfigurationTriggeredImpl} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
+ */
+@Deprecated
+public class ScraperConfigurationClassicImpl implements ScraperConfiguration {
+
+ private final Map<String, String> sources;
+ private final List<JobConfigurationImpl> jobConfigurations;
+
+ /**
+ * Default constructor.
+ *
+ * @param sources Map from connection alias to connection string
+ * @param jobConfigurations List of configurations one for each Job
+ */
+ @JsonCreator
+ public ScraperConfigurationClassicImpl(@JsonProperty(value = "sources", required = true) Map<String, String> sources,
+ @JsonProperty(value = "jobs", required = true) List<JobConfigurationImpl> jobConfigurations) {
+ checkNoUnreferencedSources(sources, jobConfigurations);
+ this.sources = sources;
+ this.jobConfigurations = jobConfigurations;
+ }
+
+ private void checkNoUnreferencedSources(Map<String, String> sources, List<JobConfigurationImpl> jobConfigurations) {
+ Set<String> unreferencedSources = jobConfigurations.stream()
+ .flatMap(job -> job.getSources().stream())
+ .filter(source -> !sources.containsKey(source))
+ .collect(Collectors.toSet());
+ if (!unreferencedSources.isEmpty()) {
+ throw new ScraperConfigurationException("There are the following unreferenced sources: " + unreferencedSources);
+ }
+ }
+
+ @Override
+ public Map<String, String> getSources() {
+ return sources;
+ }
+
+ @Override
+ public List<JobConfigurationImpl> getJobConfigurations() {
+ return jobConfigurations;
+ }
+
+ @Override
+ public List<ScrapeJob> getJobs() throws ScraperException {
+ return ScraperConfigurationTriggeredImpl.getJobs(jobConfigurations,sources);
+ }
+
+
+}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationBuilder.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationClassicImplBuilder.java
similarity index 61%
rename from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationBuilder.java
rename to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationClassicImplBuilder.java
index f0cb3b9..e64a009 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationBuilder.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationClassicImplBuilder.java
@@ -19,34 +19,36 @@
package org.apache.plc4x.java.scraper.config;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * @deprecated Scraper is deprecated please use {@link org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredScraperConfigurationBuilder} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
+ * @deprecated Scraper is deprecated please use {@link ScraperConfigurationTriggeredImplBuilder} instead all functions are supplied as well see java-doc of {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}
*/
@Deprecated
-public class ScraperConfigurationBuilder {
+public class ScraperConfigurationClassicImplBuilder {
private final Map<String, String> sources = new HashMap<>();
private final List<JobConfigurationImpl> jobConfigurations = new ArrayList<>();
- public ScraperConfigurationBuilder addSource(String alias, String connectionString) {
+ public ScraperConfigurationClassicImplBuilder addSource(String alias, String connectionString) {
sources.put(alias, connectionString);
return this;
}
- public JobConfigurationImplBuilder job(String name, int scrapeRateMs) {
- return new JobConfigurationImplBuilder(this, name, scrapeRateMs);
+ public JobConfigurationClassicImplBuilder job(String name, int scrapeRateMs) {
+ return new JobConfigurationClassicImplBuilder(this, name, scrapeRateMs);
}
public ScraperConfiguration build() {
- return new ScraperConfiguration(sources, jobConfigurations);
+ return new ScraperConfigurationClassicImpl(sources, jobConfigurations);
}
- public void addJobConfiguration(JobConfigurationImpl configuration) {
+ public void addJobConfiguration(JobConfigurationClassicImpl configuration) {
this.jobConfigurations.add(configuration);
}
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImpl.java
new file mode 100644
index 0000000..48d850c
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper.config.triggeredscraper;
+
+
+import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+import org.apache.plc4x.java.scraper.config.ScraperConfigurationClassicImpl;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Configuration for one {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl} in the {@link ScraperConfigurationTriggeredImpl}.
+ */
+public class JobConfigurationTriggeredImpl extends JobConfigurationImpl {
+
+
+ /**
+ * Default constructor
+ *
+ * @param name Job Name / identifier
+ * @param triggerConfig configuration string for triggered jobs
+ * @param scrapeRate
+ * @param sources source alias (<b>not</b> connection string but the alias (from @{@link ScraperConfigurationClassicImpl}).
+ * @param fields Map from field alias (how it is named in the result map) to plc4x field query
+ */
+ public JobConfigurationTriggeredImpl(String name, String triggerConfig, Integer scrapeRate, List<String> sources, Map<String, String> fields) {
+ super(name, triggerConfig, scrapeRate, sources, fields);
+ }
+}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredJobConfigurationBuilder.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java
similarity index 76%
rename from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredJobConfigurationBuilder.java
rename to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java
index afcb852..48b7bd6 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredJobConfigurationBuilder.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/JobConfigurationTriggeredImplBuilder.java
@@ -19,7 +19,6 @@
package org.apache.plc4x.java.scraper.config.triggeredscraper;
-import org.apache.commons.lang3.Validate;
import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException;
import java.util.ArrayList;
@@ -27,16 +26,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class TriggeredJobConfigurationBuilder {
+public class JobConfigurationTriggeredImplBuilder {
- private final TriggeredScraperConfigurationBuilder parent;
+ private final ScraperConfigurationTriggeredImplBuilder parent;
private final String name;
private final String triggerConfig;
private final List<String> sources = new ArrayList<>();
private final Map<String, String> fields = new HashMap<>();
- public TriggeredJobConfigurationBuilder(TriggeredScraperConfigurationBuilder parent, String name, String triggerConfig) {
+ public JobConfigurationTriggeredImplBuilder(ScraperConfigurationTriggeredImplBuilder parent, String name, String triggerConfig) {
if(parent==null){
throw new ScraperConfigurationException("parent builder cannot be null");
}
@@ -48,7 +47,7 @@ public class TriggeredJobConfigurationBuilder {
this.triggerConfig = triggerConfig;
}
- public TriggeredJobConfigurationBuilder source(String alias) {
+ public JobConfigurationTriggeredImplBuilder source(String alias) {
if(alias==null || alias.isEmpty()){
throw new ScraperConfigurationException("source alias cannot be null or empty");
}
@@ -56,16 +55,16 @@ public class TriggeredJobConfigurationBuilder {
return this;
}
- public TriggeredJobConfigurationBuilder field(String alias, String fieldQuery) {
+ public JobConfigurationTriggeredImplBuilder field(String alias, String fieldQuery) {
this.fields.put(alias, fieldQuery);
return this;
}
- private TriggeredJobConfiguration buildInternal() {
- return new TriggeredJobConfiguration(name, triggerConfig, sources, fields);
+ private JobConfigurationTriggeredImpl buildInternal() {
+ return new JobConfigurationTriggeredImpl(name, triggerConfig,null, sources, fields);
}
- public TriggeredScraperConfigurationBuilder build() {
+ public ScraperConfigurationTriggeredImplBuilder build() {
parent.addJobConfiguration(this.buildInternal());
return this.parent;
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredScraperConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/ScraperConfigurationTriggeredImpl.java
similarity index 50%
rename from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredScraperConfiguration.java
rename to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/ScraperConfigurationTriggeredImpl.java
index 1051b17..af552a3 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredScraperConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/ScraperConfigurationTriggeredImpl.java
@@ -21,21 +21,18 @@ package org.apache.plc4x.java.scraper.config.triggeredscraper;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.exc.MismatchedInputException;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.plc4x.java.scraper.ScrapeJob;
import org.apache.plc4x.java.scraper.ScrapeJobImpl;
+import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.apache.plc4x.java.scraper.config.JobConfiguration;
-import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+import org.apache.plc4x.java.scraper.config.JobConfigurationClassicImpl;
import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException;
import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -46,10 +43,11 @@ import java.util.stream.Collectors;
/**
* Configuration class for {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl}.
*/
-public class TriggeredScraperConfiguration {
+public class ScraperConfigurationTriggeredImpl implements ScraperConfiguration {
+ private static final Logger logger = LoggerFactory.getLogger( ScraperConfigurationTriggeredImpl.class );
private final Map<String, String> sources;
- private final List<TriggeredJobConfiguration> jobConfigurations;
+ private final List<JobConfigurationImpl> jobConfigurations;
/**
* Default constructor.
@@ -58,15 +56,14 @@ public class TriggeredScraperConfiguration {
* @param jobConfigurations List of configurations one for each Job
*/
@JsonCreator
- public TriggeredScraperConfiguration(@JsonProperty(value = "sources", required = true) Map<String, String> sources,
- @JsonProperty(value = "jobs", required = true) List<TriggeredJobConfiguration> jobConfigurations) {
+ public ScraperConfigurationTriggeredImpl(@JsonProperty(value = "sources", required = true) Map<String, String> sources,
+ @JsonProperty(value = "jobs", required = true) List<JobConfigurationImpl> jobConfigurations) {
checkNoUnreferencedSources(sources, jobConfigurations);
- // TODO Warning on too many sources?!
this.sources = sources;
this.jobConfigurations = jobConfigurations;
}
- private void checkNoUnreferencedSources(Map<String, String> sources, List<TriggeredJobConfiguration> jobConfigurations) {
+ private void checkNoUnreferencedSources(Map<String, String> sources, List<JobConfigurationImpl> jobConfigurations) {
Set<String> unreferencedSources = jobConfigurations.stream()
.flatMap(job -> job.getSources().stream())
.filter(source -> !sources.containsKey(source))
@@ -76,74 +73,51 @@ public class TriggeredScraperConfiguration {
}
}
- public static TriggeredScraperConfiguration fromYaml(String yaml) {
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- try {
- return mapper.readValue(yaml, TriggeredScraperConfiguration.class);
- } catch (IOException e) {
- throw new ScraperConfigurationException("Unable to parse given yaml configuration!", e);
- }
- }
-
- public static TriggeredScraperConfiguration fromJson(String json) {
- ObjectMapper mapper = new ObjectMapper(new JsonFactory());
- try {
- return mapper.readValue(json, TriggeredScraperConfiguration.class);
- } catch (IOException e) {
- throw new ScraperConfigurationException("Unable to parse given json configuration!", e);
- }
- }
-
- public static TriggeredScraperConfiguration fromFile(String path) throws IOException {
- ObjectMapper mapper;
- if (path.endsWith("json")) {
- mapper = new ObjectMapper(new JsonFactory());
- } else if (path.endsWith("yml") || path.endsWith("yaml")) {
- mapper = new ObjectMapper(new YAMLFactory());
- } else {
- throw new ScraperConfigurationException("Only files with extensions json, yml or yaml can be read");
- }
- try {
- return mapper.readValue(new File(path), TriggeredScraperConfiguration.class);
- } catch (FileNotFoundException e) {
- throw new ScraperConfigurationException("Unable to find configuration given configuration file at '" + path + "'", e);
- } catch (MismatchedInputException e) {
- throw new ScraperConfigurationException("Given configuration is in wrong format!", e);
- }
- }
-
+ @Override
public Map<String, String> getSources() {
return sources;
}
- public List<TriggeredJobConfiguration> getJobConfigurations() {
+ @Override
+ public List<JobConfigurationImpl> getJobConfigurations() {
return jobConfigurations;
}
+ @Override
public List<ScrapeJob> getJobs() throws ScraperException {
+ return getJobs(jobConfigurations,sources);
+ }
+
+ public static List<ScrapeJob> getJobs(List<JobConfigurationImpl> jobConfigurations, Map<String, String> sources) throws ScraperConfigurationException {
List<ScrapeJob> scrapeJobs = new ArrayList<>();
for(JobConfiguration jobConfiguration:jobConfigurations){
- if(jobConfiguration instanceof JobConfigurationImpl){
- JobConfigurationImpl jobConfigurationImpl = (JobConfigurationImpl)jobConfiguration;
- scrapeJobs.add(new ScrapeJobImpl(jobConfiguration.getName(),
- jobConfigurationImpl.getScrapeRate(),
- getSourcesForAliases(jobConfiguration.getSources()),
+ if(jobConfiguration.getTriggerConfig()!=null){
+ logger.info("Assuming job as triggered job because triggerConfig has been set");
+ scrapeJobs.add(new TriggeredScrapeJobImpl(jobConfiguration.getName(),
+ jobConfiguration.getTriggerConfig(),
+ getSourcesForAliases(jobConfiguration.getSources(),sources),
jobConfiguration.getFields()));
}
- else{
- if(jobConfiguration instanceof TriggeredJobConfiguration){
- TriggeredJobConfiguration triggeredJobConfiguration = (TriggeredJobConfiguration) jobConfiguration;
- scrapeJobs.add(new TriggeredScrapeJobImpl(jobConfiguration.getName(),
- triggeredJobConfiguration.getTriggerConfig(),
- getSourcesForAliases(jobConfiguration.getSources()),
+ else {
+ if(jobConfiguration.getScrapeRate()!=null){
+ logger.info("Assuming job as classic job because triggerConfig has NOT been set but scrapeRate has.");
+ scrapeJobs.add(new ScrapeJobImpl(
+ jobConfiguration.getName(),
+ jobConfiguration.getScrapeRate(),
+ getSourcesForAliases(jobConfiguration.getSources(),sources),
jobConfiguration.getFields()));
}
+ else {
+ logger.info("Job has lack of trigger/scheduled config");
+ throw new ScraperConfigurationException(
+ String.format("Job %s was intended to be o triggered annotation, but no triggerConfig-Field could be found. Canceling!",jobConfiguration.getName()));
+ }
}
}
return scrapeJobs;
}
- private Map<String, String> getSourcesForAliases(List<String> aliases) {
+ private static Map<String, String> getSourcesForAliases(List<String> aliases, Map<String, String> sources) {
return aliases.stream()
.collect(Collectors.toMap(
Function.identity(),
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredScraperConfigurationBuilder.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/ScraperConfigurationTriggeredImplBuilder.java
similarity index 62%
rename from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredScraperConfigurationBuilder.java
rename to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/ScraperConfigurationTriggeredImplBuilder.java
index da02027..6c1dc64 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredScraperConfigurationBuilder.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/ScraperConfigurationTriggeredImplBuilder.java
@@ -19,30 +19,32 @@
package org.apache.plc4x.java.scraper.config.triggeredscraper;
+import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class TriggeredScraperConfigurationBuilder {
+public class ScraperConfigurationTriggeredImplBuilder {
private final Map<String, String> sources = new HashMap<>();
- private final List<TriggeredJobConfiguration> jobConfigurations = new ArrayList<>();
+ private final List<JobConfigurationImpl> jobConfigurations = new ArrayList<>();
- public TriggeredScraperConfigurationBuilder addSource(String alias, String connectionString) {
+ public ScraperConfigurationTriggeredImplBuilder addSource(String alias, String connectionString) {
sources.put(alias, connectionString);
return this;
}
- public TriggeredJobConfigurationBuilder job(String name, String triggerConfig) {
- return new TriggeredJobConfigurationBuilder(this, name, triggerConfig);
+ public JobConfigurationTriggeredImplBuilder job(String name, String triggerConfig) {
+ return new JobConfigurationTriggeredImplBuilder(this, name, triggerConfig);
}
- public TriggeredScraperConfiguration build() {
- return new TriggeredScraperConfiguration(sources, jobConfigurations);
+ public ScraperConfigurationTriggeredImpl build() {
+ return new ScraperConfigurationTriggeredImpl(sources, jobConfigurations);
}
- public void addJobConfiguration(TriggeredJobConfiguration configuration) {
+ public void addJobConfiguration(JobConfigurationTriggeredImpl configuration) {
this.jobConfigurations.add(configuration);
}
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredJobConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredJobConfiguration.java
deleted file mode 100644
index 73bb20e..0000000
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/triggeredscraper/TriggeredJobConfiguration.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.plc4x.java.scraper.config.triggeredscraper;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.plc4x.java.scraper.config.JobConfiguration;
-import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Configuration for one {@link org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl} in the {@link org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredScraperConfiguration}.
- */
-public class TriggeredJobConfiguration implements JobConfiguration {
-
- private final String name;
- private final String triggerConfig;
- private final List<String> sources;
- private final Map<String, String> fields;
-
- /**
- * Default constructor
- * @param name Job Name / identifier
- * @param triggerConfig configuration string for triggered jobs
- * @param sources source alias (<b>not</b> connection string but the alias (from @{@link ScraperConfiguration}).
- * @param fields Map from field alias (how it is named in the result map) to plc4x field query
- */
- @JsonCreator
- public TriggeredJobConfiguration(@JsonProperty(value = "name", required = true) String name,
- @JsonProperty(value = "triggerConfig", required = true) String triggerConfig,
- @JsonProperty(value = "sources", required = true) List<String> sources,
- @JsonProperty(value = "fields", required = true) Map<String, String> fields) {
- this.name = name;
- this.triggerConfig = triggerConfig;
- this.sources = sources;
- this.fields = fields;
- }
-
- public String getName() {
- return name;
- }
-
- public String getTriggerConfig() {
- return triggerConfig;
- }
-
- public List<String> getSources() {
- return sources;
- }
-
- public Map<String, String> getFields() {
- return fields;
- }
-}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScrapeJobImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScrapeJobImpl.java
index 1a73144..b9f50cc 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScrapeJobImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScrapeJobImpl.java
@@ -20,6 +20,7 @@
package org.apache.plc4x.java.scraper.triggeredscraper;
import org.apache.plc4x.java.scraper.ScrapeJob;
+import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.TriggerConfiguration;
@@ -33,7 +34,7 @@ public class TriggeredScrapeJobImpl implements ScrapeJob {
private final TriggerConfiguration triggerConfiguration;
- public TriggeredScrapeJobImpl(String jobName, String triggerConfig, Map<String, String> connections, Map<String, String> fields) throws ScraperException {
+ public TriggeredScrapeJobImpl(String jobName, String triggerConfig, Map<String, String> connections, Map<String, String> fields) throws ScraperConfigurationException {
this.jobName = jobName;
this.triggerConfig = triggerConfig;
this.sourceConnections = connections;
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
index 40ad5d5..386a412 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImpl.java
@@ -23,24 +23,32 @@ import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.commons.lang3.tuple.Triple;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.scraper.*;
+import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
import org.apache.plc4x.java.scraper.exception.ScraperException;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
import org.apache.plc4x.java.scraper.util.PercentageAboveThreshold;
+import org.apache.plc4x.java.spi.PlcDriver;
import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.*;
+import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.*;
+import java.util.stream.Collectors;
/**
* replaces the old Scraper that only could do scheduled scraping jobs
@@ -54,40 +62,103 @@ import java.util.concurrent.*;
* right now boolean variables as well as numeric variables could be used as data-types
* available comparators are ==,!= for all data-types and >,>=,<,<= for numeric data-types
*/
-public class TriggeredScraperImpl implements Scraper {
+public class TriggeredScraperImpl implements Scraper, TriggeredScraperMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperImpl.class);
+ private static final String MX_DOMAIN = "org.apache.plc4x.java";
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10,
- new BasicThreadFactory.Builder()
- .namingPattern("triggeredscraper-scheduling-thread-%d")
- .daemon(false)
- .build()
- );
- private final ExecutorService executorService = Executors.newFixedThreadPool(4,
- new BasicThreadFactory.Builder()
- .namingPattern("triggeredscraper-executer-thread-%d")
- .daemon(true)
- .build()
- );
+ private static final int DEFAULT_FUTURE_TIME_OUT = 2000;
+
+ private final ScheduledExecutorService scheduler;
+ private final ExecutorService executorService;
private final ResultHandler resultHandler;
private final MultiValuedMap<ScrapeJob, ScraperTask> tasks = new ArrayListValuedHashMap<>();
- private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> futures = new ArrayListValuedHashMap<>();
+ private final MultiValuedMap<ScraperTask, ScheduledFuture<?>> scraperTaskMap = new ArrayListValuedHashMap<>();
private final PlcDriverManager driverManager;
private final List<ScrapeJob> jobs;
+ private MBeanServer mBeanServer;
+
+ private long futureTimeOut;
+
+ private final TriggerCollector triggerCollector;
/**
* Creates a Scraper instance from a configuration.
* By default a {@link PooledPlcDriverManager} is used.
* @param config Configuration to use.
- * @param resultHandler
+ * @param resultHandler handler the defines the processing of acquired data
+ */
+ public TriggeredScraperImpl(ScraperConfiguration config, ResultHandler resultHandler, TriggerCollector triggerCollector) throws ScraperException {
+ this(resultHandler, createPooledDriverManager(), config.getJobs(),triggerCollector,DEFAULT_FUTURE_TIME_OUT);
+ }
+
+ /**
+ * Creates a Scraper instance from a configuration.
+ * @param config Configuration to use.
+ * @param plcDriverManager external DriverManager
+ * @param resultHandler handler the defines the processing of acquired data
+ */
+ public TriggeredScraperImpl(ScraperConfiguration config, PlcDriverManager plcDriverManager, ResultHandler resultHandler,TriggerCollector triggerCollector) throws ScraperException {
+ this(resultHandler, plcDriverManager, config.getJobs(),triggerCollector,DEFAULT_FUTURE_TIME_OUT);
+ }
+
+ /**
+ * Creates a Scraper instance from a configuration.
+ * @param config Configuration to use.
+ * @param plcDriverManager external DriverManager
+ * @param resultHandler handler the defines the processing of acquired data
+ */
+ public TriggeredScraperImpl(ScraperConfigurationTriggeredImpl config, PlcDriverManager plcDriverManager, ResultHandler resultHandler, TriggerCollector triggerCollector, int poolSizeScheduler, int poolSizeExecutor) throws ScraperException {
+ this(resultHandler, plcDriverManager, config.getJobs(),triggerCollector,DEFAULT_FUTURE_TIME_OUT,poolSizeScheduler,poolSizeExecutor);
+ }
+
+ /**
+ * Creates a Scraper instance from a configuration.
+ * @param plcDriverManager external DriverManager
+ * @param resultHandler handler the defines the processing of acquired data
+ * @param jobs list of jobs that scraper shall handle
+ * @param triggerCollector a collection that centralizes the trigger requests and joins them to grouped plc requests
+ * @param futureTimeOut max duration of future to return a result
*/
- public TriggeredScraperImpl(TriggeredScraperConfiguration config, ResultHandler resultHandler) throws ScraperException {
- this(resultHandler, createPooledDriverManager(), config.getJobs());
+ public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager plcDriverManager, List<ScrapeJob> jobs,TriggerCollector triggerCollector, long futureTimeOut) {
+ this(resultHandler,plcDriverManager,jobs,triggerCollector,futureTimeOut,20,5);
}
+ public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager plcDriverManager, List<ScrapeJob> jobs,TriggerCollector triggerCollector, long futureTimeOut, int poolSizeScheduler, int poolSizeExecutor) {
+ this.resultHandler = resultHandler;
+ Validate.notEmpty(jobs);
+ this.driverManager = plcDriverManager;
+ this.jobs = jobs;
+ this.triggerCollector = triggerCollector;
+ this.futureTimeOut = futureTimeOut;
+
+ this.scheduler = Executors.newScheduledThreadPool(poolSizeScheduler,
+ new BasicThreadFactory.Builder()
+ .namingPattern("triggeredscraper-scheduling-thread-%d")
+ .daemon(false)
+ .build()
+ );
+
+ this.executorService = Executors.newFixedThreadPool(poolSizeExecutor,
+ new BasicThreadFactory.Builder()
+ .namingPattern("triggeredscraper-executor-thread-%d")
+ .daemon(true)
+ .build()
+ );
+
+
+ // Register MBean
+ mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ try {
+ mBeanServer.registerMBean(this, new ObjectName(MX_DOMAIN, "scraper", "scraper"));
+ } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+ LOGGER.debug("Unable to register Scraper as MBean", e);
+ }
+ }
+
+
/**
* Min Idle per Key is set to 1 for situations where the network is broken.
* Then, on reconnect we can fail all getConnection calls (in the ScraperTask) fast until
@@ -96,21 +167,13 @@ public class TriggeredScraperImpl implements Scraper {
private static PooledPlcDriverManager createPooledDriverManager() {
return new PooledPlcDriverManager(pooledPlcConnectionFactory -> {
GenericKeyedObjectPoolConfig<PlcConnection> poolConfig = new GenericKeyedObjectPoolConfig<>();
- poolConfig.setMinIdlePerKey(1); // This should avoid problems with long running connect attempts??
+ poolConfig.setMinIdlePerKey(1);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
return new GenericKeyedObjectPool<>(pooledPlcConnectionFactory, poolConfig);
});
}
-
- public TriggeredScraperImpl(ResultHandler resultHandler, PlcDriverManager driverManager, List<ScrapeJob> jobs) {
- this.resultHandler = resultHandler;
- Validate.notEmpty(jobs);
- this.driverManager = driverManager;
- this.jobs = jobs;
- }
-
/**
* Start the scraping.
*/
@@ -119,39 +182,49 @@ public class TriggeredScraperImpl implements Scraper {
public void start() {
// Schedule all jobs
LOGGER.info("Starting jobs...");
- jobs.stream()
- .flatMap(job -> job.getSourceConnections().entrySet().stream()
- .map(entry -> Triple.of(job, entry.getKey(), entry.getValue()))
- )
- .forEach(
- tuple -> {
+ //start iterating over all available jobs
+ for(ScrapeJob job:jobs){
+ //iterate over all source the jobs shall performed on
+ for(Map.Entry<String,String> sourceEntry:job.getSourceConnections().entrySet()){
+ if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Register task for job {} for conn {} ({}) at rate {} ms",
- tuple.getLeft().getJobName(), tuple.getMiddle(), tuple.getRight(), tuple.getLeft().getScrapeRate());
- TriggeredScraperTask task =
- null;
- try {
- task = new TriggeredScraperTask(driverManager,
- tuple.getLeft().getJobName(),
- tuple.getMiddle(),
- tuple.getRight(),
- tuple.getLeft().getFields(),
- 1_000,
- executorService,
- resultHandler,
- (TriggeredScrapeJobImpl) tuple.getLeft());
- // Add task to internal list
- tasks.put(tuple.getLeft(), task);
- ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(task,
- 0, tuple.getLeft().getScrapeRate(), TimeUnit.MILLISECONDS);
-
- // Store the handle for stopping, etc.
- futures.put(task, future);
- } catch (ScraperException e) {
- LOGGER.warn("Error executing the job {} for conn {} ({}) at rate {} ms",tuple.getLeft().getJobName(), tuple.getMiddle(), tuple.getRight(), tuple.getLeft().getScrapeRate(),e);
+ job.getJobName(),
+ sourceEntry.getKey(),
+ sourceEntry.getValue(),
+ job.getScrapeRate());
+ }
+
+ //create the regarding triggered scraper task
+ TriggeredScraperTask triggeredScraperTask;
+ try {
+ triggeredScraperTask = new TriggeredScraperTask(
+ driverManager,
+ job.getJobName(),
+ sourceEntry.getKey(),
+ sourceEntry.getValue(),
+ job.getFields(),
+ futureTimeOut,
+ executorService,
+ resultHandler,
+ (TriggeredScrapeJobImpl) job,
+ triggerCollector);
+
+ // Add task to internal list
+ if(LOGGER.isInfoEnabled()) {
+ LOGGER.info("Task {} added to scheduling", triggeredScraperTask);
}
+ registerTaskMBean(triggeredScraperTask);
+ tasks.put(job, triggeredScraperTask);
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(triggeredScraperTask, 0, job.getScrapeRate(), TimeUnit.MILLISECONDS);
+ // Store the handle for stopping, etc.
+ scraperTaskMap.put(triggeredScraperTask, future);
+ } catch (ScraperException e) {
+ LOGGER.warn("Error executing the job {} for conn {} ({}) at rate {} ms",job.getJobName(), sourceEntry.getKey(), sourceEntry.getValue(), job.getScrapeRate(),e);
}
- );
+ }
+
+ }
// Add statistics tracker
scheduler.scheduleAtFixedRate(() -> {
@@ -163,25 +236,124 @@ public class TriggeredScraperImpl implements Scraper {
entry.getValue().getPercentageFailed(),
statistics.apply(new PercentageAboveThreshold(entry.getKey().getScrapeRate() * 1e6)),
statistics.getMin() * 1e-6, statistics.getMean() * 1e-6, statistics.getPercentile(50) * 1e-6);
- LOGGER.debug(msg);
+ if(LOGGER.isDebugEnabled()) {
+ LOGGER.debug(msg);
+ }
}
}, 1_000, 1_000, TimeUnit.MILLISECONDS);
}
- @Override
- public int getNumberOfActiveTasks() {
- return 0;
+ /**
+ * Register a task as MBean
+ * @param task task to register
+ */
+ private void registerTaskMBean(ScraperTask task) {
+ try {
+ mBeanServer.registerMBean(task, new ObjectName(MX_DOMAIN + ":type=ScrapeTask,name=" + task.getJobName() + "-" + task.getConnectionAlias()));
+ } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException | MalformedObjectNameException e) {
+ LOGGER.debug("Unable to register Task as MBean", e);
+ }
}
@Override
public void stop() {
// Stop all futures
LOGGER.info("Stopping scraper...");
- for (Map.Entry<ScraperTask, ScheduledFuture<?>> entry : futures.entries()) {
+ for (Map.Entry<ScraperTask, ScheduledFuture<?>> entry : scraperTaskMap.entries()) {
LOGGER.debug("Stopping task {}...", entry.getKey());
entry.getValue().cancel(true);
}
// Clear the map
- futures.clear();
+ scraperTaskMap.clear();
+ }
+
+ /**
+ * acquires a plc connection from connection pool
+ * @param plcDriverManager Driver manager handling connection and pools
+ * @param connectionString Connection string as defined in the regarding implementation of {@link PlcDriver}
+ * @param executorService ExecuterService holding a pool as threads handling requests and stuff
+ * @param requestTimeoutMs maximum awaiting for the the future to return a result
+ * @param info additional info for trace reasons
+ * @return the {@link PlcConnection} used for acquiring data from PLC endpoint
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
+ String connectionString,
+ ExecutorService executorService,
+ long requestTimeoutMs,
+ String info) throws InterruptedException, ExecutionException, TimeoutException {
+ if(!info.isEmpty() && LOGGER.isTraceEnabled()){
+ LOGGER.trace("Additional Info from caller {}", info);
+ }
+ CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
+ try {
+ return plcDriverManager.getConnection(connectionString);
+ } catch (PlcConnectionException e) {
+ LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
+ throw new PlcRuntimeException(e);
+ }
+ catch (Exception e){
+ LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
+ throw new PlcRuntimeException(e);
+ }
+ }, executorService);
+ if(LOGGER.isTraceEnabled()){
+ LOGGER.trace("try to get a connection to {}", connectionString);
+ }
+ PlcConnection plcConnection=null;
+ try {
+ plcConnection = future.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e){
+ LOGGER.trace("Additional Info from caller {}", info,e);
+ throw e;
+ }
+ return plcConnection;
+ }
+
+ /**
+ * acquires a plc connection from connection pool
+ * @param plcDriverManager Driver manager handling connection and pools
+ * @param connectionString Connection string as defined in the regarding implementation of {@link PlcDriver}
+ * @param executorService ExecuterService holding a pool as threads handling requests and stuff
+ * @param requestTimeoutMs maximum awaiting for the the future to return a result
+ * @return the {@link PlcConnection} used for acquiring data from PLC endpoint
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ public static PlcConnection getPlcConnection(PlcDriverManager plcDriverManager,
+ String connectionString,
+ ExecutorService executorService,
+ long requestTimeoutMs) throws InterruptedException, ExecutionException, TimeoutException {
+ return getPlcConnection(plcDriverManager,connectionString,executorService,requestTimeoutMs,"");
+ }
+
+ /**
+ * transforms the results from a {@link PlcReadResponse} into a map
+ * @param plcReadResponse response that shall be converted to map for further processing
+ * @return the converted map
+ */
+ public static Map<String, Object> convertPlcResponseToMap(PlcReadResponse plcReadResponse) {
+ return plcReadResponse.getFieldNames().stream()
+ .collect(Collectors.toMap(
+ name -> name,
+ plcReadResponse::getObject
+ ));
+ }
+
+
+ // MBean methods
+ @Override
+ public boolean isRunning() {
+ // TODO is this okay so?
+ return !scraperTaskMap.isEmpty();
+ }
+
+ @Override
+ public int getNumberOfActiveTasks() {
+ return (int) scraperTaskMap.entries().stream().filter(entry -> !entry.getValue().isDone()).count();
}
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperMBean.java
similarity index 76%
copy from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
copy to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperMBean.java
index 98b5b79..b8e5232 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperMBean.java
@@ -17,18 +17,17 @@
* under the License.
*/
-package org.apache.plc4x.java.scraper.config;
+package org.apache.plc4x.java.scraper.triggeredscraper;
-import java.util.List;
-import java.util.Map;
+import org.apache.plc4x.java.scraper.Scraper;
/**
- * Created by timbo on 2019-03-05
+ * MBean for {@link Scraper}
*/
-public interface JobConfiguration {
- String getName();
+public interface TriggeredScraperMBean {
- List<String> getSources();
+ boolean isRunning();
+
+ int getNumberOfActiveTasks();
- Map<String, String> getFields();
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
index 56549a5..6465790 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTask.java
@@ -23,8 +23,6 @@ import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.types.PlcResponseCode;
@@ -32,6 +30,8 @@ import org.apache.plc4x.java.scraper.ResultHandler;
import org.apache.plc4x.java.scraper.ScraperTask;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.TriggerHandler;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.TriggerHandlerImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +46,8 @@ import java.util.stream.Collectors;
/**
* performs the triggered task from a job for one device based on the TriggerHandler as defined in Configuration
- * ToDo Implement the monitoring as well: PLC4X-90
*/
-public class TriggeredScraperTask implements ScraperTask {
+public class TriggeredScraperTask implements ScraperTask, TriggeredScraperTaskMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperTask.class);
private final PlcDriverManager driverManager;
@@ -66,6 +65,7 @@ public class TriggeredScraperTask implements ScraperTask {
private final DescriptiveStatistics latencyStatistics = new DescriptiveStatistics(1000);
private final DescriptiveStatistics failedStatistics = new DescriptiveStatistics(1000);
+
public TriggeredScraperTask(PlcDriverManager driverManager,
String jobName,
String connectionAlias,
@@ -74,7 +74,8 @@ public class TriggeredScraperTask implements ScraperTask {
long requestTimeoutMs,
ExecutorService executorService,
ResultHandler resultHandler,
- TriggeredScrapeJobImpl triggeredScrapeJob) throws ScraperException {
+ TriggeredScrapeJobImpl triggeredScrapeJob,
+ TriggerCollector triggerCollector) throws ScraperException {
this.driverManager = driverManager;
this.jobName = jobName;
this.connectionAlias = connectionAlias;
@@ -83,38 +84,48 @@ public class TriggeredScraperTask implements ScraperTask {
this.requestTimeoutMs = requestTimeoutMs;
this.executorService = executorService;
this.resultHandler = resultHandler;
- this.triggerHandler = new TriggerHandler(triggeredScrapeJob.getTriggerConfig(),triggeredScrapeJob,this);
+ this.triggerHandler = new TriggerHandlerImpl(triggeredScrapeJob.getTriggerConfig(),triggeredScrapeJob,this,triggerCollector);
}
@Override
- //ToDo code-refactoring and improved testing --> PLC4X-90
public void run() {
+ if(LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Check condition for task of job {} for connection {}", jobName, connectionAlias);
+ }
if(this.triggerHandler.checkTrigger()) {
// Does a single fetch only when trigger is valid
- LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+ if(LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Trigger for job {} and device {} is met ... scraping desired data", jobName, connectionAlias);
+ }
+ if(LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Start new scrape of task of job {} for connection {}", jobName, connectionAlias);
+ }
requestCounter.incrementAndGet();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
PlcConnection connection = null;
try {
- CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
- try {
- return driverManager.getConnection(connectionString);
- } catch (PlcConnectionException e) {
- LOGGER.warn("Unable to instantiate connection to " + connectionString, e);
- throw new PlcRuntimeException(e);
- }
- }, executorService);
- connection = future.get(10 * requestTimeoutMs, TimeUnit.MILLISECONDS);
- LOGGER.trace("Connection to {} established: {}", connectionString, connection);
- PlcReadResponse response;
+ String info = "";
+ if(LOGGER.isTraceEnabled()) {
+ info = String.format("acquiring data collecting connection to (%s,%s)", connectionAlias,jobName);
+ LOGGER.trace("acquiring data collecting connection to ({},{})", connectionAlias,jobName);
+ }
+ connection = TriggeredScraperImpl.getPlcConnection(driverManager,connectionString,executorService,requestTimeoutMs,info);
+ if(LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Connection to {} established: {}", connectionString, connection);
+ }
+
+ PlcReadResponse plcReadResponse;
try {
- PlcReadRequest.Builder builder = connection.readRequestBuilder();
- fields.forEach((alias, qry) -> {
- LOGGER.trace("Requesting: {} -> {}", alias, qry);
- builder.addItem(alias, qry);
- });
- response = builder
+ PlcReadRequest.Builder readRequestBuilder = connection.readRequestBuilder();
+ for(Map.Entry<String,String> entry:fields.entrySet()){
+ if(LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Requesting: {} -> {}", entry.getKey(), entry.getValue());
+ }
+ readRequestBuilder.addItem(entry.getKey(),entry.getValue());
+ }
+ //build and send request and store result in read response
+ plcReadResponse = readRequestBuilder
.build()
.execute()
.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
@@ -123,17 +134,19 @@ public class TriggeredScraperTask implements ScraperTask {
handleException(e);
return;
}
+
// Add statistics
+ LOGGER.debug("Performing statistics");
stopWatch.stop();
latencyStatistics.addValue(stopWatch.getNanoTime());
failedStatistics.addValue(0.0);
successCounter.incrementAndGet();
// Validate response
- validateResponse(response);
+ validateResponse(plcReadResponse);
// Handle response (Async)
- CompletableFuture.runAsync(() -> resultHandler.handle(jobName, connectionAlias, transformResponseToMap(response)), executorService);
+ CompletableFuture.runAsync(() -> resultHandler.handle(jobName, connectionAlias, TriggeredScraperImpl.convertPlcResponseToMap(plcReadResponse)), executorService);
} catch (Exception e) {
- LOGGER.debug("Exception during scrape", e);
+ LOGGER.warn("Exception during scraping of Job {}, Connection-Alias {}: Error-message: {} - for stack-trace change logging to DEBUG", jobName,connectionAlias,e.getCause());
handleException(e);
} finally {
if (connection != null) {
@@ -147,6 +160,11 @@ public class TriggeredScraperTask implements ScraperTask {
}
}
+ /**
+ * detects if {@link PlcReadResponse} is valid
+ * //ToDo CHECK: is this thing S7 specific? if not this comment can be removed
+ * @param response the {@link PlcReadResponse} that should be validated
+ */
private void validateResponse(PlcReadResponse response) {
Map<String, PlcResponseCode> failedFields = response.getFieldNames().stream()
.filter(name -> !PlcResponseCode.OK.equals(response.getResponseCode(name)))
@@ -159,52 +177,42 @@ public class TriggeredScraperTask implements ScraperTask {
}
}
- private Map<String, Object> transformResponseToMap(PlcReadResponse response) {
- return response.getFieldNames().stream()
- .collect(Collectors.toMap(
- name -> name,
- response::getObject
- ));
- }
-
@Override
public String getJobName() {
- return null;
+ return this.jobName;
}
@Override
public String getConnectionAlias() {
- return null;
+ return this.connectionAlias;
}
@Override
public long getRequestCounter() {
- return 0;
+ return this.requestCounter.get();
}
@Override
public long getSuccessfullRequestCounter() {
- return 0;
+ return this.successCounter.get();
}
@Override
public DescriptiveStatistics getLatencyStatistics() {
- return null;
- }
-
- @Override
- public double getPercentageFailed() {
- return 0;
+ return this.latencyStatistics;
}
@Override
public void handleException(Exception e) {
-
+ if(LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Detailed exception occurred at scraping", e);
+ }
+ failedStatistics.addValue(1.0);
}
@Override
public void handleErrorResponse(Map<String, PlcResponseCode> failed) {
-
+ LOGGER.warn("Handling error responses: {}", failed);
}
public PlcDriverManager getDriverManager() {
@@ -222,4 +230,45 @@ public class TriggeredScraperTask implements ScraperTask {
public long getRequestTimeoutMs() {
return requestTimeoutMs;
}
+
+ @Override
+ public String toString() {
+ return "TriggeredScraperTask{" +
+ "driverManager=" + driverManager +
+ ", jobName='" + jobName + '\'' +
+ ", connectionAlias='" + connectionAlias + '\'' +
+ ", connectionString='" + connectionString + '\'' +
+ ", requestTimeoutMs=" + requestTimeoutMs +
+ ", executorService=" + executorService +
+ ", resultHandler=" + resultHandler +
+ ", triggerHandler=" + triggerHandler +
+ '}';
+ }
+
+ //---------------------------------
+ // JMX Monitoring
+ //---------------------------------
+ @Override
+ public long getScrapesTotal() {
+ return requestCounter.get();
+ }
+
+ @Override
+ public long getScrapesSuccess() {
+ return successCounter.get();
+ }
+
+ @Override
+ public double getPercentageFailed() {
+ return 100.0 - (double)this.getScrapesSuccess()/this.getScrapesTotal() * 100.0;
+ }
+
+ @Override
+ public String[] getPercentiles() {
+ String[] percentiles = new String[10];
+ for (int i = 1; i <= 10; i += 1) {
+ percentiles[i - 1] = String.format("%d%%: %s ms", 10 * i, latencyStatistics.getPercentile(10.0 * i) * 1e-6);
+ }
+ return percentiles;
+ }
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTaskMBean.java
similarity index 76%
copy from plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
copy to plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTaskMBean.java
index 98b5b79..33c1e98 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperTaskMBean.java
@@ -17,18 +17,19 @@
* under the License.
*/
-package org.apache.plc4x.java.scraper.config;
-
-import java.util.List;
-import java.util.Map;
+package org.apache.plc4x.java.scraper.triggeredscraper;
/**
- * Created by timbo on 2019-03-05
+ * MBean for a scrape job.
*/
-public interface JobConfiguration {
- String getName();
+public interface TriggeredScraperTaskMBean {
+
+ long getScrapesTotal();
+
+ long getScrapesSuccess();
+
+ double getPercentageFailed();
- List<String> getSources();
+ String[] getPercentiles();
- Map<String, String> getFields();
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
index 8d7ba26..0049e55 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java
@@ -19,303 +19,339 @@
package org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler;
-import org.apache.plc4x.java.api.exceptions.PlcInvalidFieldException;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.s7.model.S7Field;
+import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* basic configuration for all available triggers and handling of regarding condition
*/
-//ToDo: Improve structure to make it more generic --> PLC4X-89
public class TriggerConfiguration{
private static final Logger logger = LoggerFactory.getLogger(TriggerConfiguration.class);
private static final String S_7_TRIGGER_VAR = "S7_TRIGGER_VAR";
- private static final String SCHEDULED = "SCHEDULED";
+ private static final String SCHEDULED = "SCHEDULED";
+ private static final String PREVIOUS_DEF = "PREV";
private static final double TOLERANCE_FLOATING_EQUALITY = 1e-6;
private static final Pattern TRIGGER_STRATEGY_PATTERN =
- Pattern.compile("\\((?<strategy>[A-Z_0-9]+),(?<scheduledInterval>\\d+)(,(\\((?<triggerVar>\\S+)\\))((?<comp>[!=<>]{1,2}))(\\((?<compVar>[a-z0-9.\\-]+)\\)))?\\)");
+ Pattern.compile("\\((?<strategy>[A-Z_0-9]+),(?<scheduledInterval>\\d+)(,(\\((?<triggerVar>[^!=<>()]+)\\))((?<comp>[!=<>]{1,2}))(\\((?<compVar>[PREVa-z0-9.\\-]+)\\))((?<concatConn>[ANDOR]{2,3})(\\((?<triggerVar2>[^!=<>()]+)\\))((?<comp2>[!=<>]{1,2}))(\\((?<compVar2>[PREVa-z0-9.\\-]+)\\)))?)?\\)");
private final TriggerType triggerType;
private final Long scrapeInterval;
- private final String triggerVariable;
- private final String comparator;
- private Comparators comparatorType;
private TriggeredScrapeJobImpl triggeredScrapeJobImpl;
-
- private final Object compareValue;
- private final PlcField plcField;
+ private List<TriggerElement> triggerElementList;
/**
* default constructor when an S7Field should be used for triggering
* @param triggerType type of trigger from enum
* @param scrapeInterval scrape interval of triggered variable
- * @param triggerVariable field that is conditional for trigger comparison
- * @param comparator selected comparator
- * @param compareValue selected ref-value that is comapred against
+ * @param triggerElementList list of triggerElemts with concat that combined is used as triger
* @param triggeredScrapeJobImpl the job which is valid for the configuration
* @throws ScraperException when something goes wrong with configuration
*/
- public TriggerConfiguration(TriggerType triggerType, String scrapeInterval, String triggerVariable, String comparator, String compareValue, TriggeredScrapeJobImpl triggeredScrapeJobImpl) throws ScraperException {
+ public TriggerConfiguration(TriggerType triggerType,
+ String scrapeInterval,
+ List<TriggerElement> triggerElementList,
+ TriggeredScrapeJobImpl triggeredScrapeJobImpl)
+ throws ScraperConfigurationException {
+ this.triggerElementList = triggerElementList;
this.triggerType = triggerType;
this.triggeredScrapeJobImpl = triggeredScrapeJobImpl;
this.scrapeInterval = parseScrapeInterval(scrapeInterval);
- this.triggerVariable = triggerVariable;
- this.comparator = comparator;
+
+ String exceptionMessage;
if(this.triggerType.equals(TriggerType.S7_TRIGGER_VAR)) {
//test for valid field-connection string, on exception quit job and return message to user
- try {
- // TODO: PLC4X-106 - Make the Scraper not depend on S7 directly
- this.plcField = S7Field.of(triggerVariable);
- } catch (PlcInvalidFieldException e) {
- logger.debug(e.getMessage(), e);
- String exceptionMessage = String.format("Invalid trigger Field for Job %s: %s", triggeredScrapeJobImpl.getJobName(), triggerVariable);
- throw new ScraperException(exceptionMessage);
+ if(this.triggerElementList.isEmpty()){
+ exceptionMessage = String.format("No items in trigger List for trigger-type S7_TRIGGER_VAR for Job %s!", triggeredScrapeJobImpl.getJobName());
+ throw new ScraperConfigurationException(exceptionMessage);
}
+ checkTriggerVarList();
+
//ToDo add more and other trigger
}
else{
- String exceptionMessage = String.format("TriggerType %s is not yet implemented", this.triggerType);
- throw new ScraperException(exceptionMessage);
+ exceptionMessage = String.format("TriggerType %s is not yet implemented", this.triggerType);
+ throw new ScraperConfigurationException(exceptionMessage);
}
-
- this.compareValue = convertCompareValue(compareValue);
- detectComparatorType();
- matchTypeAndComparator();
-
}
/**
* default constructor when scheduled trigger shall be performed
* @param triggerType type of trigger from enum
* @param scrapeInterval scrape interval of data from block
- * @throws ScraperException when something goes wrong with configuration
+ * @throws ScraperConfigurationException when something goes wrong with configuration
*/
- public TriggerConfiguration(TriggerType triggerType, String scrapeInterval) throws ScraperException {
+ public TriggerConfiguration(TriggerType triggerType, String scrapeInterval) throws ScraperConfigurationException {
this.triggerType = triggerType;
this.scrapeInterval = parseScrapeInterval(scrapeInterval);
- this.triggerVariable = null;
- this.comparator = null;
- this.compareValue = null;
- this.plcField = null;
- this.comparatorType = null;
+ this.triggerElementList = new ArrayList<>();
+ }
+
+ /**
+ * checks the trigger list for correct syntax
+ * @throws ScraperConfigurationException if syntax isn't correct an exception is thrown
+ */
+ private void checkTriggerVarList() throws ScraperConfigurationException {
+ boolean first = true;
+ for(TriggerElement triggerElement:this.triggerElementList){
+ if(!first && triggerElement.getConcatType()==null){
+ throw new ScraperConfigurationException("A concat for the second and following trigger must be given!");
+ }
+ first = false;
+ }
}
/**
* parses String of scrape interval
* @param scrapeInterval string extracted from RegEx
* @return converted value
- * @throws ScraperException if parsing could not be performed
+ * @throws ScraperConfigurationException if parsing could not be performed
*/
- private long parseScrapeInterval(String scrapeInterval) throws ScraperException {
+ private long parseScrapeInterval(String scrapeInterval) throws ScraperConfigurationException {
try {
return Long.parseLong(scrapeInterval);
}
catch (Exception e){
handleException(e);
String exceptionMessage = String.format("No valid numeric for scrapeInterval for Job %s: %s",triggeredScrapeJobImpl.getJobName(),scrapeInterval);
- throw new ScraperException(exceptionMessage);
+ throw new ScraperConfigurationException(exceptionMessage);
}
}
/**
* evaluates the trigger dependent of base type and converts acquired respectively ref-value to the needed datatype
- * @param value acquired value
+ * @param acquiredValues acquired value
* @return true when condition is matched, false otherwise
* @throws ScraperException when something goes wrong
*/
- boolean evaluateTrigger(Object value) throws ScraperException {
- if(validateDataType().equals(Boolean.class)){
- boolean currentValue;
- boolean refValue;
- try{
- currentValue = (boolean) value;
- refValue = (boolean) compareValue;
- }
- catch (Exception e){
- handleException(e);
- return false;
- }
- if(this.comparatorType.equals(Comparators.EQUAL)){
- return currentValue == refValue;
- }
- else {
- return currentValue != refValue;
- }
- }
- if(validateDataType().equals(Double.class)
- || validateDataType().equals(Integer.class)
- || validateDataType().equals(Long.class)) {
- double currentValue;
- double refValue;
- try{
- refValue = (double) compareValue;
- if(value instanceof Short){
- currentValue = ((Short) value).doubleValue();
- }
- else {
- if (value instanceof Integer) {
- currentValue = ((Integer) value).doubleValue();
- }
- else {
- if (value instanceof Long) {
- currentValue = ((Long) value).doubleValue();
- }
- else{
- if (value instanceof Double) {
- currentValue = (Double) value;
- }else {
- currentValue = (double) value;
- }
- }
- }
-
- }
-
- //
-
- }
- catch (Exception e){
- handleException(e);
- return false;
- }
-
- switch (this.comparatorType) {
- case EQUAL:
- return isApproximately(currentValue,refValue, TOLERANCE_FLOATING_EQUALITY);
- case UNEQUAL:
- return !isApproximately(currentValue,refValue, TOLERANCE_FLOATING_EQUALITY);
- case SMALLER:
- return currentValue < refValue;
- case SMALLER_EQUAL:
- return currentValue <= refValue;
- case GREATER:
- return currentValue > refValue;
- case GREATER_EQUAL:
- return currentValue >= refValue;
- }
-
- }
- //should not happen, as fallback return false which always implies that no data is collected
- return false;
+ boolean evaluateTrigger(List<Object> acquiredValues) throws ScraperException {
+ TriggerEvaluation triggerEvaluation = new TriggerEvaluation(acquiredValues,triggerElementList);
+ return triggerEvaluation.evaluateTrigger();
}
- /**
- * convertes parsed comparator from regex to ComparatorType
- * @throws ScraperException when no valid comparator has been used
- */
- private void detectComparatorType() throws ScraperException {
- switch (this.comparator){
- case "==":
- this.comparatorType= Comparators.EQUAL;
- break;
- case "!=":
- this.comparatorType= Comparators.UNEQUAL;
- break;
- case "<=":
- this.comparatorType= Comparators.SMALLER_EQUAL;
- break;
- case "<":
- this.comparatorType= Comparators.SMALLER;
- break;
- case ">=":
- this.comparatorType= Comparators.GREATER_EQUAL;
- break;
- case ">":
- this.comparatorType= Comparators.GREATER;
- break;
- default:
- throw new ScraperException("Invalid comparator detected!");
- }
- }
-
- /**
- * matches data-type and comparator for a valid combination
- * @throws ScraperException when invalid combination is detected
- */
- private void matchTypeAndComparator() throws ScraperException {
- if(validateDataType().equals(Boolean.class)
- && !(this.comparatorType.equals(Comparators.EQUAL) || this.comparatorType.equals(Comparators.UNEQUAL))){
- String exceptionMessage = String.format("Trigger-Data-Type (%s) and Comparator (%s) do not match",this.plcField.getDefaultJavaType(),this.comparatorType);
- throw new ScraperException(exceptionMessage);
- }
- //all other combinations are valid
- }
/**
* defines the used base type for comparison
* @return the detected base type
- * @throws ScraperException when an unsupported S7-Type is choosen,which is not (yet) implemented for comparison
+ * @throws ScraperException when an unsupported S7-Type is chosen,which is not (yet) implemented for comparison
* ToDo check how to handle time-variables if needed
*/
- private Class<?> validateDataType() throws ScraperException {
- if(this.plcField!=null){
- Class<?> javaDataType = this.plcField.getDefaultJavaType();
+ private static Class<?> validateDataType(PlcField plcField) throws ScraperConfigurationException {
+ if(plcField!=null){
+ Class<?> javaDataType = plcField.getDefaultJavaType();
if(!javaDataType.equals(Boolean.class)
&& !javaDataType.equals(Integer.class)
&& !javaDataType.equals(Long.class)
&& !javaDataType.equals(Double.class)
){
- String exceptionMessage = String.format("Unsupported plc-trigger variable %s with converted data-type %s used",this.plcField,this.plcField.getDefaultJavaType());
- throw new ScraperException(exceptionMessage);
+ String exceptionMessage = String.format("Unsupported plc-trigger variable %s with converted data-type %s used",plcField,plcField.getDefaultJavaType());
+ throw new ScraperConfigurationException(exceptionMessage);
}
return javaDataType;
}
else{
- String exceptionMessage = String.format("Unsupported plc-trigger variable %s with converted data-type %s used",this.plcField,this.plcField.getDefaultJavaType());
- throw new ScraperException(exceptionMessage);
+ String exceptionMessage = String.format("Unsupported plc-trigger variable %s with converted data-type %s used",plcField,plcField.getDefaultJavaType());
+ throw new ScraperConfigurationException(exceptionMessage);
}
}
/**
- * parses the ref-value to a given value, as well as checking if ref-value matches to the given data-type
- * @param compareValue compare-value extracted by regex
- * @return converted object to needed data-type
- * @throws ScraperException when something does not match or parsing fails
+ * nested class performing the trigger evaluation
*/
- private Object convertCompareValue(String compareValue) throws ScraperException {
- Class<?> javaDataType =validateDataType();
- if(javaDataType.equals(Boolean.class)){
- switch (compareValue){
- case "1":
- case "true":
- return true;
- case "0":
- case "false":
- return false;
- default:
- String exceptionMessage = String.format("No valid compare Value at DataType Boolean for trigger for Job %s: %s",triggeredScrapeJobImpl.getJobName(),compareValue);
- throw new ScraperException(exceptionMessage);
- }
+ class TriggerEvaluation{
+ private List<Object> acquiredValuesList;
+ private List<TriggerElement> triggerElementList;
+
+ TriggerEvaluation(List<Object> acquiredValuesList, List<TriggerElement> triggerElementList) {
+ this.acquiredValuesList = acquiredValuesList;
+ this.triggerElementList = triggerElementList;
}
- if(javaDataType.equals(Double.class)
- || javaDataType.equals(Integer.class)
- || javaDataType.equals(Long.class)){
- try {
- //everything fits to Double for conversion ... so for first step use only double
- //ToDo if different handling dependent on specific datatype is needed then differ
- return Double.parseDouble(compareValue);
+
+ /**
+ * does the evaluation of the trigger conditions are met
+ * //ToDo refactor this to improve readability
+ * @return true if trigger conditions are met, false otherwise
+ * @throws ScraperException if something went wrong
+ */
+ boolean evaluateTrigger() throws ScraperException {
+ List<Boolean> triggerResultList = new ArrayList<>();
+ if(logger.isTraceEnabled()){
+ String connString = "empty";
+ if(!triggerElementList.isEmpty()) {
+ connString = triggerElementList.get(0).getPlcConnectionString();
+ }
+ logger.trace("eval values for job {} and {}: {}",triggeredScrapeJobImpl.getJobName(),connString,acquiredValuesList);
+ }
+ //iterate through all items of acquirement-list
+ for(int countElements=0; countElements<acquiredValuesList.size();countElements++){
+ TriggerElement triggerElement = triggerElementList.get(countElements);
+ Object acquiredObject = acquiredValuesList.get(countElements);
+ if(validateDataType(triggerElement.getPlcField()).equals(Boolean.class)){
+ //if given type is Boolean
+ boolean currentValue;
+ boolean refValue;
+ try{
+ currentValue = (boolean) acquiredObject;
+ refValue = (boolean) triggerElement.getCompareValue();
+ }
+ catch (Exception e){
+ handleException(e);
+ return false;
+ }
+ if(triggerElement.getComparatorType().equals(Comparator.EQUAL)){
+ triggerResultList.add(currentValue == refValue);
+ }
+ else {
+ triggerResultList.add(currentValue != refValue);
+ }
+ }
+ if(validateDataType(triggerElement.getPlcField()).equals(Double.class)
+ || validateDataType(triggerElement.getPlcField()).equals(Integer.class)
+ || validateDataType(triggerElement.getPlcField()).equals(Long.class)) {
+ //if given type is numerical
+ boolean skipComparison = false; //comparison shall be skipped if previous values was null
+ double currentValue;
+ double refValue = 0;
+ try{
+
+ if(acquiredObject instanceof Short){
+ currentValue = ((Short) acquiredObject).doubleValue();
+ }
+ else {
+ if (acquiredObject instanceof Integer) {
+ currentValue = ((Integer) acquiredObject).doubleValue();
+ }
+ else {
+ if (acquiredObject instanceof Long) {
+ currentValue = ((Long) acquiredObject).doubleValue();
+ }
+ else{
+ if (acquiredObject instanceof Double) {
+ currentValue = (Double) acquiredObject;
+ }else {
+ currentValue = (double) acquiredObject;
+ }
+ }
+ }
+
+ }
+ if(triggerElement.getPreviousMode()){
+ if(triggerElement.getCompareValue()==null){
+ triggerElement.setCompareValue(currentValue);
+ triggerElement.setReservedCompareValue(currentValue);
+ triggerResultList.add(true);
+ if(logger.isTraceEnabled()) {
+ logger.trace("Initially set compare value to {}", currentValue);
+ }
+ skipComparison=true;
+ }
+ else{
+ refValue = (double) triggerElement.getCompareValue();
+ }
+ }
+ else {
+ refValue = (double) triggerElement.getCompareValue();
+ }
+
+ }
+ catch (Exception e){
+ handleException(e);
+ return false;
+ }
+
+ boolean triggerResult = false;
+ if(!skipComparison) {
+ switch (triggerElement.getComparatorType()) {
+ case EQUAL:
+ triggerResult = isApproximately(currentValue, refValue, TOLERANCE_FLOATING_EQUALITY);
+ break;
+ case UNEQUAL:
+ triggerResult = !isApproximately(currentValue, refValue, TOLERANCE_FLOATING_EQUALITY);
+ break;
+ case SMALLER:
+ triggerResult = currentValue < refValue;
+ break;
+ case SMALLER_EQUAL:
+ triggerResult = currentValue <= refValue;
+ break;
+ case GREATER:
+ triggerResult = currentValue > refValue;
+ break;
+ case GREATER_EQUAL:
+ triggerResult = currentValue >= refValue;
+ break;
+ default:
+ triggerResult = false;
+ }
+ }
+
+ if(triggerResult && triggerElement.getPreviousMode()){
+ triggerElement.setReservedCompareValue(currentValue);
+ if(logger.isTraceEnabled()) {
+ logger.trace("Subcondition matched. Previous value: {}, current compare value {} for Job {}",
+ triggerElement.getReservedCompareValue(),
+ triggerElement.getCompareValue(),
+ triggerElement.getTriggerJob());
+ }
+ }
+ triggerResultList.add(triggerResult);
+
+ }
+
}
- catch (Exception e){
- logger.debug(e.getMessage(), e);
- String exceptionMessage = String.format("No valid compare Value at DataType Numeric for trigger for Job %s: %s",triggeredScrapeJobImpl.getJobName(),compareValue);
- throw new ScraperException(exceptionMessage);
+ if(triggerResultList.isEmpty()){
+ if(logger.isDebugEnabled()) {
+ logger.debug("No results could be acquired - setting trigger to false");
+ }
+ return false;
+ }
+ //check if there is more then one condition for trigger
+ if(triggerResultList.size()>1) {
+ if(logger.isTraceEnabled()) {
+ logger.trace("{}", triggerResultList);
+ }
+ boolean combinedResult=triggerResultList.get(0);
+ for (int countElements = 1; countElements < acquiredValuesList.size(); countElements++) {
+ switch (triggerElementList.get(countElements).getConcatType()){
+ case AND:
+ combinedResult = combinedResult && triggerResultList.get(countElements);
+ break;
+ case OR:
+ combinedResult = combinedResult || triggerResultList.get(countElements);
+ break;
+ default:
+ //should not happen
+ combinedResult = false;
+ }
+ }
+ if(combinedResult) {
+ triggerElementList.forEach(TriggerElement::overrideCompareValue);
+ }
+ return combinedResult;
+ }
+ else{
+ if(triggerResultList.get(0)) {
+ triggerElementList.forEach(TriggerElement::overrideCompareValue);
+ }
+ //return first result because its the only one
+ return triggerResultList.get(0);
}
}
- String exceptionMessage = "Invalid Datatype detected ... should not happen and be catcht earlier - please report";
- throw new ScraperException(exceptionMessage);
}
/**
@@ -325,42 +361,80 @@ public class TriggerConfiguration{
* @return created TriggerConfiguration
* @throws ScraperException when something goes wrong
*/
- public static TriggerConfiguration createConfiguration(String jobTriggerStrategy,TriggeredScrapeJobImpl triggeredScrapeJob) throws ScraperException {
+ public static TriggerConfiguration createConfiguration(String jobTriggerStrategy,TriggeredScrapeJobImpl triggeredScrapeJob) throws ScraperConfigurationException {
Matcher matcher = TRIGGER_STRATEGY_PATTERN.matcher(jobTriggerStrategy);
if(matcher.matches()){
- String strat = matcher.group("strategy");
+ String triggerStrategy = matcher.group("strategy");
String scheduledMs = matcher.group("scheduledInterval");
-
- logger.debug("Strategy: {}, scheduled ms: {}",strat,scheduledMs);
+ if(logger.isDebugEnabled()) {
+ logger.debug("Strategy: {}, scheduled ms: {}", triggerStrategy, scheduledMs);
+ }
String triggerVar = matcher.group("triggerVar");
String comparatorString = matcher.group("comp");
String comparatorVariable = matcher.group("compVar");
- switch (strat){
+ switch (triggerStrategy){
case S_7_TRIGGER_VAR:
+
if(triggerVar ==null || comparatorString==null || comparatorVariable==null){
- throw new ScraperException("S7_TRIGGER_VAR trigger strategy needs the trigger-condition - information missing! given configString: "+jobTriggerStrategy);
+ throw new ScraperConfigurationException("S7_TRIGGER_VAR trigger strategy needs the trigger-condition - information missing! given configString: "+jobTriggerStrategy);
+ }
+
+ List<TriggerElement> triggerElements = new ArrayList<>();
+
+ TriggerElement triggerElement = new TriggerElement(
+ comparatorString,
+ null,
+ comparatorVariable,
+ triggerVar,
+ triggerStrategy);
+
+ triggerElement.setTriggerJob(triggeredScrapeJob.getJobName());
+
+ triggerElements.add(triggerElement);
+
+ String concatConn = matcher.group("concatConn");
+ String triggerVar2 = matcher.group("triggerVar2");
+ String comparatorString2 = matcher.group("comp2");
+ String comparatorVariable2 = matcher.group("compVar2");
+
+ if(triggerVar2 != null && comparatorString2 != null && comparatorVariable2 != null && concatConn != null){
+ TriggerElement triggerElement2 = new TriggerElement(
+ comparatorString2,
+ concatConn,
+ comparatorVariable2,
+ triggerVar2,
+ triggerStrategy);
+
+
+ triggerElement2.setTriggerJob(triggeredScrapeJob.getJobName());
+ triggerElements.add(triggerElement2);
+
}
- return new TriggerConfiguration(TriggerType.S7_TRIGGER_VAR,scheduledMs,triggerVar,comparatorString,comparatorVariable,triggeredScrapeJob);
+
+ //ToDo add clever Strategy to concat more than two conditions if needed
+ return new TriggerConfiguration(TriggerType.S7_TRIGGER_VAR,scheduledMs,triggerElements,triggeredScrapeJob);
case SCHEDULED:
if(triggerVar !=null || comparatorString!=null || comparatorVariable!=null){
- throw new ScraperException("SCHEDULED trigger strategy must only be used with scheduled interval - nothing more! given configString: "+jobTriggerStrategy);
+ throw new ScraperConfigurationException("SCHEDULED trigger strategy must only be used with scheduled interval - nothing more! given configString: "+jobTriggerStrategy);
}
return new TriggerConfiguration(TriggerType.SCHEDULED,scheduledMs);
default:
- throw new ScraperException("Unknown Trigger Strategy "+strat);
+ throw new ScraperConfigurationException("Unknown Trigger Strategy "+triggerStrategy);
}
}
- throw new ScraperException("Invalid trigger strategy string description: "+jobTriggerStrategy);
+ throw new ScraperConfigurationException("Invalid trigger strategy string description: "+jobTriggerStrategy);
}
private void handleException(Exception e){
//push up if needed
- logger.debug("Exception: ", e);
+ if(logger.isDebugEnabled()) {
+ logger.debug("Exception: ", e);
+ }
}
TriggerType getTriggerType() {
@@ -371,16 +445,8 @@ public class TriggerConfiguration{
return scrapeInterval;
}
- String getTriggerVariable() {
- return triggerVariable;
- }
-
- Comparators getComparatorType() {
- return comparatorType;
- }
-
- Object getCompareValue() {
- return compareValue;
+ public List<TriggerElement> getTriggerElementList() {
+ return triggerElementList;
}
/**
@@ -395,7 +461,7 @@ public class TriggerConfiguration{
return Math.abs(self - other) <= within;
}
- public enum Comparators{
+ public enum Comparator {
EQUAL,
UNEQUAL,
GREATER,
@@ -409,4 +475,248 @@ public class TriggerConfiguration{
SCHEDULED,
S7_TRIGGER_VAR
}
+
+ public enum ConcatType {
+ AND,
+ OR
+ }
+
+ public static class TriggerElement{
+ private Comparator comparatorType;
+ private ConcatType concatType;
+ //if trigger should be compared to previous value
+ private Boolean previousMode;
+ private Object compareValue;
+ private PlcField plcField;
+ private String plcFieldString;
+
+ private String plcConnectionString;
+
+ private String uuid;
+
+ private String triggerJob;
+
+ //storage for overwrite if condition matched
+ private Object reservedCompareValue;
+
+ public TriggerElement() {
+ this.comparatorType = null;
+ this.concatType = null;
+ this.previousMode = false;
+ this.compareValue = null;
+ this.plcField = null;
+ this.plcFieldString = null;
+ this.reservedCompareValue = null;
+ this.plcConnectionString="not defined";
+ this.triggerJob = "Not yet defined";
+ this.uuid = "";
+ }
+
+ public TriggerElement(Comparator comparatorType, ConcatType concatType, Boolean previousMode, Object compareValue, PlcField plcField, String plcFieldString) {
+ this.comparatorType = comparatorType;
+ this.concatType = concatType;
+ this.previousMode = previousMode;
+ this.compareValue = compareValue;
+ this.plcField = plcField;
+ this.plcFieldString = plcFieldString;
+ }
+
+ public TriggerElement(Comparator comparatorType, Object compareValue, PlcField plcField) {
+ this();
+ this.comparatorType = comparatorType;
+ this.compareValue = compareValue;
+ this.plcField = plcField;
+ }
+
+ TriggerElement(String comparator, String concatType, String compareValue, String plcField, String triggerStrategy) throws ScraperConfigurationException {
+ this();
+ this.plcFieldString = plcField;
+ this.plcConnectionString = plcConnectionString;
+ if(triggerStrategy.equals(S_7_TRIGGER_VAR)){
+ try {
+ this.plcField = S7Field.of(this.plcFieldString);
+ }
+ catch (Exception e){
+ if(logger.isDebugEnabled()) {
+ logger.debug("Exception occurred parsing a S7Field");
+ }
+ throw new ScraperConfigurationException("Exception on parsing S7Field (" + plcField + "): " + e.getMessage());
+ }
+ this.compareValue = convertCompareValue(compareValue,this.plcField);
+ this.comparatorType = detectComparatorType(comparator);
+ matchTypeAndComparator();
+ }
+
+ this.concatType = detectConcatType(concatType);
+
+ }
+
+
+ /**
+ * parses the ref-value to a given value, as well as checking if ref-value matches to the given data-type
+ * @param compareValue compare-value extracted by regex
+ * @return converted object to needed data-type
+ * @throws ScraperException when something does not match or parsing fails
+ */
+ private Object convertCompareValue(String compareValue, PlcField plcField) throws ScraperConfigurationException {
+ Class<?> javaDataType = validateDataType(plcField);
+ if(javaDataType.equals(Boolean.class)){
+ switch (compareValue){
+ case "1":
+ case "true":
+ return true;
+ case "0":
+ case "false":
+ return false;
+ default:
+ String exceptionMessage = String.format("No valid compare Value at DataType Boolean for trigger: %s",compareValue);
+ throw new ScraperConfigurationException(exceptionMessage);
+ }
+ }
+ if(javaDataType.equals(Double.class)
+ || javaDataType.equals(Integer.class)
+ || javaDataType.equals(Long.class)){
+ try {
+ //everything fits to Double for conversion ... so for first step use only double
+ //ToDo if different handling dependent on specific datatype is needed then differ
+ if(PREVIOUS_DEF.equals(compareValue)){
+ this.previousMode=true;
+ return null;
+ }
+ return Double.parseDouble(compareValue);
+ }
+ catch (Exception e){
+ logger.debug(e.getMessage(), e);
+ String exceptionMessage = String.format("No valid compare Value at DataType Numeric for trigger: %s",compareValue);
+ throw new ScraperConfigurationException(exceptionMessage);
+ }
+ }
+ String exceptionMessage = "Invalid Datatype detected ... should not happen and be catcht earlier - please report";
+ throw new ScraperConfigurationException(exceptionMessage);
+ }
+
+ /**
+ * converts parsed comparator from regex to ComparatorType
+ * @throws ScraperException when no valid comparator has been used
+ */
+ private Comparator detectComparatorType(String comparator) throws ScraperConfigurationException {
+ switch (comparator){
+ case "==":
+ return Comparator.EQUAL;
+ case "!=":
+ return Comparator.UNEQUAL;
+ case "<=":
+ return Comparator.SMALLER_EQUAL;
+ case "<":
+ return Comparator.SMALLER;
+ case ">=":
+ return Comparator.GREATER_EQUAL;
+ case ">":
+ return Comparator.GREATER;
+ default:
+ throw new ScraperConfigurationException("Invalid comparator detected!");
+ }
+ }
+
+ /**
+ * convertes parsed comparator from regex to ComparatorType
+ * @throws ScraperException when no valid comparator has been used
+ */
+ private ConcatType detectConcatType(String concat) throws ScraperConfigurationException {
+ //concat is not necessary in every case, correct usage is checked later on
+ if(concat==null){
+ return null;
+ }
+ switch (concat){
+ case "AND":
+ return ConcatType.AND;
+ case "OR":
+ return ConcatType.OR;
+ default:
+ throw new ScraperConfigurationException("Invalid concat between triggerVars detected: "+concat);
+ }
+ }
+
+ /**
+ * matches data-type and comparator for a valid combination
+ * @throws ScraperException when invalid combination is detected
+ */
+ private void matchTypeAndComparator() throws ScraperConfigurationException {
+ if(validateDataType(this.plcField).equals(Boolean.class)
+ && !(this.comparatorType.equals(Comparator.EQUAL) || this.comparatorType.equals(Comparator.UNEQUAL))){
+ String exceptionMessage = String.format("Trigger-Data-Type (%s) and Comparator (%s) do not match",this.plcField.getDefaultJavaType(),this.comparatorType);
+ throw new ScraperConfigurationException(exceptionMessage);
+ }
+ //all other combinations are valid
+ }
+
+ Comparator getComparatorType() {
+ return comparatorType;
+ }
+
+ ConcatType getConcatType() {
+ return concatType;
+ }
+
+ Boolean getPreviousMode() {
+ return previousMode;
+ }
+
+ Object getCompareValue() {
+ return compareValue;
+ }
+
+ PlcField getPlcField() {
+ return plcField;
+ }
+
+ String getPlcFieldString() {
+ return plcFieldString;
+ }
+
+ void setCompareValue(Object compareValue) {
+ this.compareValue = compareValue;
+ }
+
+ Object getReservedCompareValue() {
+ return reservedCompareValue;
+ }
+
+ void setReservedCompareValue(Object reservedCompareValue) {
+ this.reservedCompareValue = reservedCompareValue;
+ }
+
+ String getTriggerJob() {
+ return triggerJob;
+ }
+
+ void setTriggerJob(String triggerJob) {
+ this.triggerJob = triggerJob;
+ }
+
+ void overrideCompareValue(){
+ if(this.previousMode && this.reservedCompareValue!=null){
+ if(logger.isDebugEnabled()) {
+ logger.debug("Compare value overridden, before: {}, now: {}; for Trigger {}", this.compareValue, this.reservedCompareValue, this.triggerJob);
+ }
+ this.compareValue = this.reservedCompareValue;
+ }
+ }
+
+ public String getPlcConnectionString() {
+ return plcConnectionString;
+ }
+
+ public void setPlcConnectionString(String plcConnectionString) {
+ this.plcConnectionString = plcConnectionString;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+ }
}
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandler.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandler.java
index a2f45bb..98582e0 100644
--- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandler.java
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandler.java
@@ -18,114 +18,6 @@
*/
package org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
-import org.apache.plc4x.java.scraper.exception.ScraperException;
-import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
-import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * holds the handler for the regarding trigger-scraper on rising-trigger edge
- */
-public class TriggerHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(TriggerHandler.class);
- private static final String TRIGGER = "trigger";
-
- private final TriggerConfiguration triggerConfiguration;
- private final TriggeredScraperTask parentScraperTask;
-
- //used to enable trigger only on rising edge
- private boolean lastTriggerState;
-
- public TriggerHandler(String triggerStrategy, TriggeredScrapeJobImpl triggeredScrapeJob,TriggeredScraperTask parentScraperTask) throws ScraperException {
- this.triggerConfiguration = TriggerConfiguration.createConfiguration(triggerStrategy,triggeredScrapeJob);
- this.parentScraperTask = parentScraperTask;
- this.lastTriggerState = false;
- }
-
- /**
- * checks rising edge of trigger event
- * @return true on detection of rising edge, false otherwise
- */
- public boolean checkTrigger(){
- switch (this.triggerConfiguration.getTriggerType()){
- case SCHEDULED:
- //used base scheduling -> trigger is always true
- return true;
- case S7_TRIGGER_VAR:
- return checkS7TriggerVariable();
- default:
- //should not happen
- return false;
- }
- }
-
- /**
- * acquires the given S7Field from S7 and evaluates if trigger is released
- * @return true if rising-edge of trigger is detected, false otherwise
- */
- private boolean checkS7TriggerVariable(){
-
- CompletableFuture<PlcConnection> future = CompletableFuture.supplyAsync(() -> {
- try {
- return parentScraperTask.getDriverManager().getConnection(parentScraperTask.getConnectionString());
- } catch (PlcConnectionException e) {
- LOGGER.warn("Unable to instantiate connection to " + parentScraperTask.getConnectionString(), e);
- throw new PlcRuntimeException(e);
- }
- }, parentScraperTask.getExecutorService());
- PlcConnection connection = null;
- try {
- connection = future.get(parentScraperTask.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
- LOGGER.trace("Connection to {} established: {}", parentScraperTask.getConnectionString(), connection);
- PlcReadRequest.Builder builder = connection.readRequestBuilder();
- builder.addItem(TRIGGER, triggerConfiguration.getTriggerVariable());
- PlcReadResponse response = builder
- .build()
- .execute()
- .get(parentScraperTask.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
-
- //check if trigger condition from TriggerConfiguration is fulfilled
- boolean trigger = triggerConfiguration.evaluateTrigger(response.getObject(TRIGGER));
-
- //only trigger scraping of data on rising edge of trigger
- if(trigger && !this.lastTriggerState){
- this.lastTriggerState = true;
- return true;
- }
- else{
- this.lastTriggerState = trigger;
- return false;
- }
-
- } catch (Exception e) {
- // Handle execution exception
- parentScraperTask.handleException(e);
- return false;
- }
- finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception e) {
- LOGGER.warn("Error on closing connection",e);
- // intentionally do nothing
- }
- }
- }
- }
-
-
-
-
-
-
+public interface TriggerHandler {
+ boolean checkTrigger();
}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java
new file mode 100644
index 0000000..6a2c551
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerHandlerImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler;
+
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * holds the handler for the regarding trigger-scraper on rising-trigger edge
+ */
+public class TriggerHandlerImpl implements TriggerHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TriggerHandlerImpl.class);
+
+ private final TriggerConfiguration triggerConfiguration;
+
+ private final TriggerCollector triggerCollector;
+
+ //used to enable trigger only on rising edge
+ private boolean lastTriggerState;
+
+ public TriggerHandlerImpl(String triggerStrategy, TriggeredScrapeJobImpl triggeredScrapeJob, TriggeredScraperTask parentScraperTask, TriggerCollector triggerCollector) throws ScraperException {
+ this.triggerConfiguration = TriggerConfiguration.createConfiguration(triggerStrategy,triggeredScrapeJob);
+
+ //transmit needed trigger to triggerCollection
+ for(TriggerConfiguration.TriggerElement triggerElement:triggerConfiguration.getTriggerElementList()){
+ triggerElement.setPlcConnectionString(parentScraperTask.getConnectionString());
+ triggerElement.setUuid(triggerCollector.submitTrigger(triggerElement.getPlcFieldString(),parentScraperTask.getConnectionString(),this.triggerConfiguration.getScrapeInterval()));
+ }
+
+ this.lastTriggerState = false;
+ this.triggerCollector = triggerCollector;
+ }
+
+ /**
+ * checks rising edge of trigger event
+ * @return true on detection of rising edge, false otherwise
+ */
+ @Override
+ public boolean checkTrigger(){
+ switch (this.triggerConfiguration.getTriggerType()){
+ case SCHEDULED:
+ //used base scheduling -> trigger is always true
+ return true;
+ case S7_TRIGGER_VAR:
+ return checkS7TriggerVariable();
+ default:
+ //should not happen
+ return false;
+ }
+ }
+
+ /**
+ * acquires the given S7Field from S7 and evaluates if trigger is released
+ * @return true if rising-edge of trigger is detected, false otherwise
+ */
+ private boolean checkS7TriggerVariable(){
+
+ List<Object> acquiredValuesList = new ArrayList<>();
+ for(TriggerConfiguration.TriggerElement triggerElement:triggerConfiguration.getTriggerElementList()){
+ try {
+ Object result = triggerCollector.requestResult(triggerElement.getUuid());
+ if(result==null){
+ return false;
+ }
+ acquiredValuesList.add(result);
+ } catch (ScraperException e) {
+ LOGGER.warn("Went wrong",e);
+ }
+ }
+
+ //check if trigger condition from TriggerConfiguration is fulfilled
+ boolean trigger = false;
+ try {
+ trigger = triggerConfiguration.evaluateTrigger(acquiredValuesList);
+ } catch (ScraperException e) {
+ LOGGER.warn("Could not evaluate trigger");
+ }
+
+ //only trigger scraping of data on rising edge of trigger
+ if(trigger && !this.lastTriggerState){
+ this.lastTriggerState = true;
+ return true;
+ }
+ else{
+ this.lastTriggerState = trigger;
+ return false;
+ }
+
+ }
+
+}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollector.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollector.java
new file mode 100644
index 0000000..5e1ea0f
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector;
+
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+
+/**
+ * defines the interface for implementing a TriggerCollector
+ * that handles and acquires all triggerRequests at once that needs a PlcConnection
+ */
+public interface TriggerCollector {
+
+ /**
+ * submits a trigger request to TriggerCollector
+ * @param plcField a (plc) field that is used for triggering procedure
+ * @param plcConnectionString the connection string to the regarding source
+ * @param maxAwaitingTime max awaiting time until request shall be submitted
+ * @return a uuid under that the request is handled internally
+ */
+ String submitTrigger(String plcField, String plcConnectionString, long maxAwaitingTime) throws ScraperException;
+
+ /**
+ * requests the result of submitted plc request with default timeout
+ * @param uuid uuid that represents the request
+ * @return the object acquired by requesting plc instance
+ */
+ Object requestResult(String uuid) throws ScraperException;
+
+ /**
+ * requests the result of submitted plc request
+ * @param uuid uuid that represents the request
+ * @param timeout timeout until response shall be acquired
+ * @return the object acquired by requesting plc instance
+ */
+ Object requestResult(String uuid, long timeout) throws ScraperException;
+
+ /**
+ * starts the acquirement of triggers
+ */
+ void start();
+
+ /**
+ * stops acquirement of triggers
+ */
+ void stop();
+}
diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java
new file mode 100644
index 0000000..a87cc73
--- /dev/null
+++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/collector/TriggerCollectorImpl.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * default implementation for TriggerCollector
+ */
+public class TriggerCollectorImpl implements TriggerCollector {
+ private static final Logger logger = LoggerFactory.getLogger( TriggerCollectorImpl.class );
+
+ private static final int DEFAULT_SCHEDULED_TRIGGER_INTERVAL = 1000;
+ private static final int FUTURE_TIMEOUT = 2000;
+ private static final int READ_REQUEST_TIMEOUT = 2000;
+
+ private final PlcDriverManager plcDriverManager;
+ private final Map<String,RequestElement> currentRequestElements;
+ private long schedulerInterval;
+ private final long futureTimeout;
+
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final ExecutorService executorService;
+
+ public TriggerCollectorImpl(PlcDriverManager plcDriverManager, long schedulerInterval, long futureTimeout, int poolSizeScheduler, int poolSizeExecutor) {
+ this.plcDriverManager = plcDriverManager;
+ this.currentRequestElements = new ConcurrentHashMap<>();
+ this.schedulerInterval = schedulerInterval;
+ this.futureTimeout = futureTimeout;
+
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(poolSizeScheduler,
+ new BasicThreadFactory.Builder()
+ .namingPattern("triggercollector-scheduledExecutorService-thread-%d")
+ .daemon(false)
+ .build()
+ );
+ this.executorService = Executors.newFixedThreadPool(poolSizeExecutor,
+ new BasicThreadFactory.Builder()
+ .namingPattern("triggercollector-executerService-thread-%d")
+ .daemon(true)
+ .build()
+ );
+
+ }
+
+ public TriggerCollectorImpl(PlcDriverManager plcDriverManager, long schedulerInterval, long futureTimeout) {
+ this(plcDriverManager,schedulerInterval,futureTimeout,10,20);
+ }
+
+ public TriggerCollectorImpl(PlcDriverManager plcDriverManager) {
+ this(plcDriverManager,DEFAULT_SCHEDULED_TRIGGER_INTERVAL, FUTURE_TIMEOUT);
+ }
+
+ /**
+ * submits a trigger request to TriggerCollector
+ *
+ * @param plcField a (plc) field that is used for triggering procedure
+ * @param plcConnectionString the connection string to the regarding source
+ * @param interval max awaiting time until request shall be submitted
+ * @return a uuid under that the request is handled internally
+ */
+ @Override
+ public String submitTrigger(String plcField, String plcConnectionString, long interval) throws ScraperException {
+ String uuid = UUID.randomUUID().toString();
+
+ if(this.schedulerInterval>interval){
+ this.schedulerInterval=interval;
+ }
+
+ RequestElement requestElement = new RequestElement(plcConnectionString,plcField,interval, uuid);
+ if(!currentRequestElements.containsValue(requestElement)){
+ currentRequestElements.put(uuid,requestElement);
+ if(logger.isDebugEnabled()) {
+ logger.debug("Received request to: {} for PLC: {}", plcField, plcConnectionString);
+ }
+ return uuid;
+ }
+ else{
+ if(logger.isTraceEnabled()) {
+ logger.trace("Received a placed trigger");
+ }
+ for(RequestElement requestElementFromMap:currentRequestElements.values()){
+ if(requestElementFromMap.equals(requestElement)){
+ //detect shortest interval if trigger used more than once
+ if(requestElementFromMap.getScanIntervalMs()>interval){
+ requestElementFromMap.setScanIntervalMs(interval);
+ }
+ return requestElementFromMap.getUuid();
+ }
+ }
+
+ //should not happen
+ throw new ScraperException(String.format("Could not evaluate UUID for given trigger (%s,%s). Should not happen please report!",plcField,plcConnectionString));
+ }
+
+ }
+
+ /**
+ * acquire all triggers within given interval from definition
+ */
+ private void processActiveTrigger(){
+ LocalDateTime currentTimestamp = LocalDateTime.now();
+ Map<String, PlcReadRequest.Builder> plcReadRequestBuilderMap = new HashMap<>();
+ Map<String, PlcReadResponse> plcReadResponseMap = new HashMap<>();
+ List<RequestElement> activeRequestElements = new ArrayList<>();
+ List<PlcConnection> plcConnectionList = new ArrayList<>();
+ PlcConnection plcConnection=null;
+ for(Map.Entry<String,RequestElement> entry:currentRequestElements.entrySet()){
+ if(entry.getValue().getLastAcquirement().isBefore(
+ currentTimestamp
+ .minus(entry.getValue().scanIntervalMs,ChronoUnit.MILLIS))
+ ){
+ String plcConnectionString = entry.getValue().plcConnectionString;
+ if(!plcReadRequestBuilderMap.containsKey(plcConnectionString)){
+ try {
+ String info = "";
+ if(logger.isTraceEnabled()) {
+ info = String.format("acquiring trigger connection to (%s)", plcConnectionString);
+ logger.trace("acquiring trigger connection to ({})", plcConnectionString);
+ }
+ plcConnection = TriggeredScraperImpl.getPlcConnection(plcDriverManager,plcConnectionString,executorService,futureTimeout,info);
+ plcConnectionList.add(plcConnection);
+ plcReadRequestBuilderMap.put(plcConnectionString,plcConnection.readRequestBuilder());
+ plcReadRequestBuilderMap.get(plcConnectionString).addItem(entry.getKey(),entry.getValue().getPlcField());
+ activeRequestElements.add(entry.getValue());
+ } catch (InterruptedException e) {
+ logger.warn("Acquirement of PLC-Connection was interrupted",e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ logger.warn("Acquirement of PLC-Connection could not be executed",e);
+ } catch (TimeoutException e) {
+ logger.warn("Acquirement of PLC-Connection was timeouted",e);
+ }
+ }
+ else{
+ plcReadRequestBuilderMap.get(plcConnectionString).addItem(entry.getKey(),entry.getValue().getPlcField());
+ activeRequestElements.add(entry.getValue());
+ }
+ }
+ }
+
+ for(Map.Entry<String,PlcReadRequest.Builder> entry: plcReadRequestBuilderMap.entrySet()){
+ try {
+ PlcReadResponse plcReadResponse = entry.getValue().build().execute().get(futureTimeout, TimeUnit.MILLISECONDS);
+ plcReadResponseMap.put(entry.getKey(), plcReadResponse);
+ } catch (InterruptedException e) {
+ logger.warn("Extraction of PlcResponse was interrupted",e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ logger.warn("Extraction of PlcResponse could not be executed",e);
+ } catch (TimeoutException e) {
+ logger.warn("Extraction of PlcResponse was timeouted",e);
+ }
+ }
+
+ LocalDateTime currentTime = LocalDateTime.now();
+ for(RequestElement requestElement:activeRequestElements){
+ requestElement.setResult(plcReadResponseMap.get(requestElement.getPlcConnectionString()).getObject(requestElement.getUuid()));
+ requestElement.setLastAcquirement(currentTime);
+ }
+ for(PlcConnection plcConnectionFromList:plcConnectionList){
+ if(plcConnectionFromList!=null){
+ try {
+ plcConnectionFromList.close();
+ } catch (Exception e) {
+ logger.warn("Could not close connection ...");
+ }
+ }
+ }
+
+ }
+
+ /**
+ * requests the result of submitted plc request with default timeout
+ *
+ * @param uuid uuid that represents the request
+ * @return the object acquired by requesting plc instance
+ */
+ @Override
+ public Object requestResult(String uuid) throws ScraperException {
+ return requestResult(uuid, READ_REQUEST_TIMEOUT);
+ }
+
+ /**
+ * requests the result of submitted plc request
+ *
+ * @param uuid uuid that represents the request
+ * @return the object acquired by requesting plc instance
+ */
+ @Override
+ public Object requestResult(String uuid, long timeout){
+ return currentRequestElements.get(uuid).getResult();
+ }
+
+ /**
+ * starts the acquirement of triggers
+ */
+ @Override
+ public void start() {
+ this.scheduledExecutorService.scheduleAtFixedRate(this::processActiveTrigger, 1_000, this.schedulerInterval, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * stops acquirement of triggers
+ */
+ @Override
+ public void stop() {
+ this.scheduledExecutorService.shutdown();
+ this.executorService.shutdown();
+ }
+
+
+ class RequestElement{
+ private String plcConnectionString;
+ private String plcField;
+ private LocalDateTime lastAcquirement;
+ private Object result;
+ private String uuid;
+ private long scanIntervalMs;
+
+
+ RequestElement(String plcConnectionString, String plcField, long scanIntervalMs, String uuid) {
+ this.plcConnectionString = plcConnectionString;
+ this.plcField = plcField;
+ this.uuid = uuid;
+ this.scanIntervalMs = scanIntervalMs;
+ //set initial acquirement to a long time ago
+ this.lastAcquirement = LocalDateTime.of(1,1,1,1,1,1);
+ }
+
+ String getPlcConnectionString() {
+ return plcConnectionString;
+ }
+
+ String getPlcField() {
+ return plcField;
+ }
+
+ public Object getResult() {
+ return result;
+ }
+
+ public void setResult(Object result) {
+ this.result = result;
+ }
+
+ String getUuid() {
+ return uuid;
+ }
+
+ long getScanIntervalMs() {
+ return scanIntervalMs;
+ }
+
+ void setScanIntervalMs(long scanIntervalMs) {
+ this.scanIntervalMs = scanIntervalMs;
+ }
+
+ LocalDateTime getLastAcquirement() {
+ return lastAcquirement;
+ }
+
+ void setLastAcquirement(LocalDateTime lastAcquirement) {
+ this.lastAcquirement = lastAcquirement;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RequestElement that = (RequestElement) o;
+ return Objects.equals(plcConnectionString, that.plcConnectionString) &&
+ Objects.equals(plcField, that.plcField);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(plcConnectionString, plcField);
+ }
+
+ @Override
+ public String toString() {
+ return "RequestElement{" +
+ "plcConnectionString='" + plcConnectionString + '\'' +
+ ", plcField='" + plcField + '\'' +
+ ", lastAcquirement=" + lastAcquirement +
+ ", result=" + result +
+ ", uuid='" + uuid + '\'' +
+ ", scanIntervalMs=" + scanIntervalMs +
+ '}';
+ }
+ }
+
+}
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperConfigurationTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperConfigurationTest.java
index a153b1c..11f4c35 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperConfigurationTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperConfigurationTest.java
@@ -24,8 +24,9 @@ import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.scraper.config.JobConfiguration;
-import org.apache.plc4x.java.scraper.config.JobConfigurationImpl;
+import org.apache.plc4x.java.scraper.config.JobConfigurationClassicImpl;
import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.ScraperConfigurationClassicImpl;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.assertj.core.api.WithAssertions;
import org.junit.jupiter.api.Nested;
@@ -58,7 +59,7 @@ class ScraperConfigurationTest implements WithAssertions {
" a: DBasdf\n" +
" b: DBbsdf\n";
- ScraperConfiguration configuration = mapper.readValue(yaml, ScraperConfiguration.class);
+ ScraperConfiguration configuration = mapper.readValue(yaml, ScraperConfigurationClassicImpl.class);
assertThat(configuration.getJobConfigurations()).hasSize(1);
JobConfiguration conf = configuration.getJobConfigurations().get(0);
@@ -71,7 +72,7 @@ class ScraperConfigurationTest implements WithAssertions {
.containsEntry("a3", "b");
assertThat(conf.getName()).isEqualTo("job1");
- assertThat(((JobConfigurationImpl)conf).getScrapeRate()).isEqualTo(10);
+ assertThat(((JobConfigurationClassicImpl)conf).getScrapeRate()).isEqualTo(10);
assertThat(conf.getSources())
.hasSize(3);
@@ -91,7 +92,7 @@ class ScraperConfigurationTest implements WithAssertions {
" sources:\n" +
" - a1\n";
- assertThatThrownBy(() -> mapper.readValue(jobs, ScraperConfiguration.class))
+ assertThatThrownBy(() -> mapper.readValue(jobs, ScraperConfigurationClassicImpl.class))
.isInstanceOf(MismatchedInputException.class);
}
@@ -112,7 +113,7 @@ class ScraperConfigurationTest implements WithAssertions {
" a: DBasdf\n" +
" b: DBbsdf\n";
- assertThatCode(() -> ScraperConfiguration.fromYaml(yaml))
+ assertThatCode(() -> ScraperConfiguration.fromYaml(yaml, ScraperConfigurationClassicImpl.class))
.doesNotThrowAnyException();
}
@@ -141,7 +142,7 @@ class ScraperConfigurationTest implements WithAssertions {
" ]\n" +
"}";
- assertThatCode(() -> ScraperConfiguration.fromJson(json))
+ assertThatCode(() -> ScraperConfiguration.fromJson(json, ScraperConfigurationClassicImpl.class))
.doesNotThrowAnyException();
}
@@ -156,7 +157,7 @@ class ScraperConfigurationTest implements WithAssertions {
" - s1\n" +
" fields:\n";
- assertThatThrownBy(() -> ScraperConfiguration.fromYaml(yaml))
+ assertThatThrownBy(() -> ScraperConfiguration.fromYaml(yaml, ScraperConfigurationClassicImpl.class))
.isInstanceOf(PlcRuntimeException.class)
.hasStackTraceContaining("unreferenced sources: [s1]");
}
@@ -173,7 +174,7 @@ class ScraperConfigurationTest implements WithAssertions {
" fields:\n" +
" field1: 'DB1 Field 1'\n";
- List<ScrapeJob> jobs = ScraperConfiguration.fromYaml(yaml).getJobs();
+ List<ScrapeJob> jobs = ScraperConfiguration.fromYaml(yaml, ScraperConfigurationClassicImpl.class).getJobs();
assertThat(jobs).hasSize(1);
ScrapeJob job = jobs.get(0);
@@ -193,12 +194,12 @@ class ScraperConfigurationTest implements WithAssertions {
@Test
void json() throws IOException {
- ScraperConfiguration conf = ScraperConfiguration.fromFile("src/test/resources/config.json");
+ ScraperConfiguration conf = ScraperConfiguration.fromFile("src/test/resources/config.json", ScraperConfigurationClassicImpl.class);
}
@Test
void yaml() throws IOException {
- ScraperConfiguration conf = ScraperConfiguration.fromFile("src/test/resources/config.yml");
+ ScraperConfiguration conf = ScraperConfiguration.fromFile("src/test/resources/config.yml", ScraperConfigurationClassicImpl.class);
}
}
}
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperRunner.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperRunner.java
index f3548bc..3a6f543 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperRunner.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/ScraperRunner.java
@@ -20,6 +20,7 @@
package org.apache.plc4x.java.scraper;
import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.ScraperConfigurationClassicImpl;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +32,7 @@ public class ScraperRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(ScraperRunner.class);
public static void main(String[] args) throws IOException, ScraperException {
- ScraperConfiguration configuration = ScraperConfiguration.fromFile("plc4j/utils/scraper/src/test/resources/example.yml");
+ ScraperConfiguration configuration = ScraperConfiguration.fromFile("plc4j/utils/scraper/src/test/resources/example.yml", ScraperConfigurationClassicImpl.class);
Scraper scraper = new ScraperImpl(configuration, (j, a, m) -> LOGGER.info("Results from {}/{}: {}", j, a, m));
scraper.start();
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/TriggeredScraperRunner.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/TriggeredScraperRunner.java
index 3f5461f..381df5a 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/TriggeredScraperRunner.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/TriggeredScraperRunner.java
@@ -19,11 +19,16 @@
package org.apache.plc4x.java.scraper;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredScraperConfiguration;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,9 +42,14 @@ public class TriggeredScraperRunner {
* testing of TriggeredScraper vs real device
*/
public static void main(String[] args) throws IOException, ScraperException {
- TriggeredScraperConfiguration configuration = TriggeredScraperConfiguration.fromFile("plc4j/utils/scraper/src/test/resources/example_triggered_scraper.yml");
- TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (j, a, m) -> LOGGER.info("Results from {}/{}: {}", j, a, m));
+
+ ScraperConfiguration configuration = ScraperConfiguration.fromFile("plc4j/utils/scraper/src/test/resources/example_triggered_scraper.yml", ScraperConfigurationTriggeredImpl.class);
+
+ PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
+ TriggerCollector triggerCollector = new TriggerCollectorImpl(plcDriverManager);
+ TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (j, a, m) -> LOGGER.info("Results from {}/{}: {}", j, a, m),triggerCollector);
scraper.start();
+ triggerCollector.start();
}
}
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/TriggeredScraperRunnerModbus.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/TriggeredScraperRunnerModbus.java
index 3ed4fe3..fd3fe3a 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/TriggeredScraperRunnerModbus.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/TriggeredScraperRunnerModbus.java
@@ -19,10 +19,14 @@
package org.apache.plc4x.java.scraper;
+import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.modbus.connection.ModbusConnectionFactory;
-import org.apache.plc4x.java.scraper.config.triggeredscraper.TriggeredScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,15 +42,20 @@ public class TriggeredScraperRunnerModbus {
* testing of TriggeredScraper vs real device (Modbus)
*/
public static void main(String[] args) throws IOException, ScraperException {
- TriggeredScraperConfiguration configuration = TriggeredScraperConfiguration.fromFile("plc4j/utils/scraper/src/test/resources/example_triggered_scraper_modbus.yml");
- TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (j, a, m) -> {
+ ScraperConfiguration configuration = ScraperConfiguration.fromFile("plc4j/utils/scraper/src/test/resources/example_triggered_scraper_modbus.yml", ScraperConfigurationTriggeredImpl.class);
+ PlcDriverManager plcDriverManager = new PooledPlcDriverManager();
+ TriggeredScraperImpl scraper = new TriggeredScraperImpl(
+ configuration,
+ plcDriverManager,
+ (j, a, m) -> {
LOGGER.info("Results from {}/{}: {}", j, a, m);
for(Map.Entry<String, Object> entry:m.entrySet()){
for(Object object:(List<Object>)entry.getValue()){
LOGGER.info("{}",object);
}
}
- });
+ },
+ new TriggerCollectorImpl(plcDriverManager));
scraper.start();
}
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationBuilderTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationBuilderTest.java
index 71b16a8..5ce58eb 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationBuilderTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/config/ScraperConfigurationBuilderTest.java
@@ -23,8 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.assertj.core.api.WithAssertions;
-import org.junit.Ignore;
-import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
@@ -33,18 +31,18 @@ class ScraperConfigurationBuilderTest implements WithAssertions {
//ToDo test is failing idon't know why (Tim)
void builder_usage_example() throws JsonProcessingException {
- ScraperConfigurationBuilder builder = new ScraperConfigurationBuilder();
+ ScraperConfigurationClassicImplBuilder builder = new ScraperConfigurationClassicImplBuilder();
List<String> sources = Arrays.asList("s1", "s2");
List<String> jobs = Arrays.asList("j1", "j2");
sources.forEach(source -> builder.addSource(source, source));
for (String job : jobs) {
- JobConfigurationImplBuilder jobConfigurationImplBuilder = builder.job(job, 10);
- sources.forEach(jobConfigurationImplBuilder::source);
+ JobConfigurationClassicImplBuilder jobConfigurationClassicImplBuilder = builder.job(job, 10);
+ sources.forEach(jobConfigurationClassicImplBuilder::source);
for (int i = 1; i <= 10; i++) {
- jobConfigurationImplBuilder.field("f" + i, "qry" + i);
+ jobConfigurationClassicImplBuilder.field("f" + i, "qry" + i);
}
- jobConfigurationImplBuilder.build();
+ jobConfigurationClassicImplBuilder.build();
}
ScraperConfiguration configuration = builder.build();
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
new file mode 100644
index 0000000..d1bbefa
--- /dev/null
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/TriggeredScraperImplTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.plc4x.java.scraper.triggeredscraper;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.plc4x.java.PlcDriverManager;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.base.messages.items.DefaultBooleanFieldItem;
+import org.apache.plc4x.java.base.messages.items.DefaultLongFieldItem;
+import org.apache.plc4x.java.mock.MockDevice;
+import org.apache.plc4x.java.mock.PlcMockConnection;
+import org.apache.plc4x.java.scraper.config.ScraperConfiguration;
+import org.apache.plc4x.java.scraper.config.ScraperConfigurationClassicImpl;
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ * @author julian
+ * Created by julian on 2019-05-08
+ */
+public class TriggeredScraperImplTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TriggeredScraperImplTest.class);
+ private PlcDriverManager driverManager;
+ private MockDevice mockDevice1;
+ private MockDevice mockDevice2;
+
+ @Before
+ public void setUp() throws Exception {
+ driverManager = new PlcDriverManager();
+ PlcMockConnection mock1Connection = ((PlcMockConnection) driverManager.getConnection("mock:1"));
+ PlcMockConnection mock2Connection = ((PlcMockConnection) driverManager.getConnection("mock:2"));
+
+ // Create Mocks
+ mockDevice1 = Mockito.mock(MockDevice.class);
+ mockDevice2 = Mockito.mock(MockDevice.class);
+ // Assign to Connections
+ mock1Connection.setDevice(mockDevice1);
+ mock2Connection.setDevice(mockDevice2);
+ }
+
+ /**
+ * Test is added because we assume some strange behavior.
+ */
+ @Test
+ public void scrapeMultipleTargets() throws ScraperException, IOException, InterruptedException {
+ // Prepare the Mocking
+ // Scrate Jobs 1 and 2
+ when(mockDevice1.read(eq("%DB810:DBB0:USINT"))).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultLongFieldItem(1L)));
+ when(mockDevice2.read(eq("%DB810:DBB0:USINT"))).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultLongFieldItem(2L)));
+ // Trigger Jobs
+ // Trigger var
+ Random rand = new Random();
+ when(mockDevice1.read(eq("%M0.3:BOOL"))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ boolean trigger = rand.nextBoolean();
+ System.out.println(trigger);
+ return Pair.of(PlcResponseCode.OK, new DefaultBooleanFieldItem(trigger));
+ }
+ });
+ when(mockDevice2.read(eq("%M0.3:BOOL"))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ boolean trigger = rand.nextBoolean();
+ System.out.println("\t\t" + trigger);
+ return Pair.of(PlcResponseCode.OK, new DefaultBooleanFieldItem(trigger));
+ }
+ });
+ // Read var
+ when(mockDevice1.read(eq("%DB810:DBW0:INT"))).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultLongFieldItem(3L)));
+ when(mockDevice2.read(eq("%DB810:DBW0:INT"))).thenReturn(Pair.of(PlcResponseCode.OK, new DefaultLongFieldItem(4L)));
+
+ ScraperConfiguration configuration = ScraperConfiguration.fromFile("src/test/resources/mock-scraper-config.yml", ScraperConfigurationClassicImpl.class);
+ TriggerCollector triggerCollector = new TriggerCollectorImpl(driverManager);
+ TriggeredScraperImpl scraper = new TriggeredScraperImpl((j, a, m) -> System.out.println(String.format("Results from %s/%s: %s", j, a, m)), driverManager, configuration.getJobs(),triggerCollector,1000);
+
+ scraper.start();
+
+ Thread.sleep(2_000);
+
+ scraper.stop();
+ }
+}
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfigurationTest.java b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfigurationTest.java
index 3bd4d98..f2c1810 100644
--- a/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfigurationTest.java
+++ b/plc4j/utils/scraper/src/test/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfigurationTest.java
@@ -19,6 +19,7 @@
package org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler;
+import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException;
import org.apache.plc4x.java.scraper.exception.ScraperException;
import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl;
import org.apache.plc4x.test.FastTests;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.stream.Stream;
+import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hamcrest.core.IsNull.notNullValue;
import static org.junit.Assert.assertThat;
@@ -44,15 +46,26 @@ class TriggerConfigurationTest {
private static Stream<Arguments> validTriggerPattern() {
return Stream.of(
- Arguments.of("(S7_TRIGGER_VAR,50,(%I0.1:BOOL)==(true))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparators.EQUAL, true),
- Arguments.of("(S7_TRIGGER_VAR,50,(%I0.1:BOOL)!=(0))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparators.UNEQUAL, false),
- Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBW10:INT)<=(33))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparators.SMALLER_EQUAL, 33.0),
- Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBB10:USINT)>=(33))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparators.GREATER_EQUAL, 33.0),
- Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:DINT)<(33))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparators.SMALLER, 33.0),
- Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(33.3))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparators.GREATER, 33.3),
- Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(33.3))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparators.GREATER, 33.3),
- Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(-1))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparators.GREATER, -1.0),
- Arguments.of("(SCHEDULED,1000)",TriggerConfiguration.TriggerType.SCHEDULED, 1000, null, null)
+
+ Arguments.of("(S7_TRIGGER_VAR,50,(%I0.1:BOOL)==(true))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.EQUAL, true,false,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%I0.1:BOOL)!=(0))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.UNEQUAL, false,false,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBW10:INT)<=(33))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.SMALLER_EQUAL, 33.0,false,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBB10:USINT)>=(33))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.GREATER_EQUAL, 33.0,false,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:DINT)<(33))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.SMALLER, 33.0,false,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(33.3))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.GREATER, 33.3,false,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(33.3))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.GREATER, 33.3,false,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(-1))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.GREATER, -1.0,false,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(PREV))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.GREATER, null,true,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(PREV))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50, TriggerConfiguration.Comparator.GREATER, null,true,null,null,null,null),
+ Arguments.of("(SCHEDULED,1000)",TriggerConfiguration.TriggerType.SCHEDULED, 1000, null, null,null,null,null,null,null),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(PREV)AND(%DB111:DBD20:REAL)>(PREV))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 50,
+ TriggerConfiguration.Comparator.GREATER, null,true,
+ TriggerConfiguration.Comparator.GREATER, null,true,
+ TriggerConfiguration.ConcatType.AND),
+ Arguments.of("(S7_TRIGGER_VAR,200,(%DB111:DBD10:REAL)>(PREV)OR(%DB111:DBD20:REAL)>(PREV))",TriggerConfiguration.TriggerType.S7_TRIGGER_VAR, 200,
+ TriggerConfiguration.Comparator.GREATER, null,true,
+ TriggerConfiguration.Comparator.GREATER, null,true,
+ TriggerConfiguration.ConcatType.OR)
);
}
@@ -69,7 +82,10 @@ class TriggerConfigurationTest {
Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBX10.1:BOOL)<=(true))"),
Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBW10:INT)<=(true))"),
Arguments.of("(MODBUS_TRIGGER_VAR,50)"),
- Arguments.of("(MODBUS_TRIGGER_VAR,50,(%DB111:DBW10:INT)<=(11))")
+ Arguments.of("(MODBUS_TRIGGER_VAR,50,(%DB111:DBW10:INT)<=(11))"),
+ Arguments.of("(S7_TRIGGER_VAR,50,(%DB111:DBD10:REAL)>(prev))"),
+ Arguments.of("(S7_TRIGGER_VAR,200,(%DB111:DBD10:REAL)>(PREV)OR(%DB111:DBD20:INT)>(17))"),
+ Arguments.of("(S7_TRIGGER_VAR,200,(%DB111:DBD10:REAL)>(PREV)AND)")
);
}
@@ -79,21 +95,38 @@ class TriggerConfigurationTest {
void testValidFieldQueryParsing(String triggerConfig,
TriggerConfiguration.TriggerType triggerType,
long scrapeInterval,
- TriggerConfiguration.Comparators comparator,
- Object refValue) {
+ TriggerConfiguration.Comparator comparator1,
+ Object refValue1,
+ Boolean previousMode1,
+ TriggerConfiguration.Comparator comparator2,
+ Object refValue2,
+ Boolean previousMode2,
+ TriggerConfiguration.ConcatType concatType
+ ) {
TriggeredScrapeJobImpl triggeredScrapeJob = Mockito.mock(TriggeredScrapeJobImpl.class);
TriggerConfiguration triggerConfiguration = null;
try {
triggerConfiguration = TriggerConfiguration.createConfiguration(triggerConfig,triggeredScrapeJob);
- } catch (ScraperException e) {
+ } catch (ScraperConfigurationException e) {
//should not happen
}
assertThat(triggerConfiguration, notNullValue());
assertThat(triggerConfiguration.getScrapeInterval(), equalTo(scrapeInterval));
assertThat(triggerConfiguration.getTriggerType(), equalTo(triggerType));
- assertThat(triggerConfiguration.getComparatorType(), equalTo(comparator));
- assertThat(triggerConfiguration.getCompareValue(), equalTo(refValue));
+ if(!triggerConfiguration.getTriggerElementList().isEmpty()) {
+ assertThat(triggerConfiguration.getTriggerElementList().get(0).getComparatorType(), equalTo(comparator1));
+ assertThat(triggerConfiguration.getTriggerElementList().get(0).getCompareValue(), equalTo(refValue1));
+ assertThat(triggerConfiguration.getTriggerElementList().get(0).getPreviousMode(), equalTo(previousMode1));
+ assertThat(triggerConfiguration.getTriggerElementList().get(0).getConcatType(), nullValue());
+
+ if (triggerConfiguration.getTriggerElementList().size() > 1) {
+ assertThat(triggerConfiguration.getTriggerElementList().get(1).getComparatorType(), equalTo(comparator2));
+ assertThat(triggerConfiguration.getTriggerElementList().get(1).getCompareValue(), equalTo(refValue2));
+ assertThat(triggerConfiguration.getTriggerElementList().get(1).getPreviousMode(), equalTo(previousMode2));
+ assertThat(triggerConfiguration.getTriggerElementList().get(1).getConcatType(), equalTo(concatType));
+ }
+ }
}
@@ -107,7 +140,7 @@ class TriggerConfigurationTest {
triggerConfiguration = TriggerConfiguration.createConfiguration(triggerConfig,triggeredScrapeJob);
assertThat(triggerConfiguration,null);
//NPE should happen when test fails!
- } catch (ScraperException e) {
+ } catch (ScraperConfigurationException e) {
LOGGER.info("Exception as expected for positive test result: {}",e.getMessage());
//should happen
}
diff --git a/plc4j/utils/scraper/src/test/resources/example_triggered_scraper.yml b/plc4j/utils/scraper/src/test/resources/example_triggered_scraper.yml
index b3c0c75..ec79e17 100644
--- a/plc4j/utils/scraper/src/test/resources/example_triggered_scraper.yml
+++ b/plc4j/utils/scraper/src/test/resources/example_triggered_scraper.yml
@@ -18,17 +18,16 @@
# ----------------------------------------------------------------------------
---
sources:
- S7_PI: s7://192.168.167.210/0/1
+ S7_PI: s7://192.168.167.210/1/1
jobs:
- name: scheduled-demo-job1
- triggerConfig: (SCHEDULED,100)
+ triggerConfig: (SCHEDULED,10000)
sources:
- S7_PI
fields:
test1: '%DB810:DBB0:USINT'
-
- name: triggered-demo-job1
triggerConfig: (S7_TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
sources:
@@ -36,9 +35,8 @@ jobs:
fields:
test1: '%DB810:DBW0:INT'
-
- name: triggered-demo-job2
- triggerConfig: (S7_TRIGGER_VAR,10,(%M0.7:BOOL)==(true))
+ triggerConfig: (S7_TRIGGER_VAR,1000,(%M0.7:BOOL)==(true))
sources:
- S7_PI
fields:
@@ -47,10 +45,30 @@ jobs:
test3: '%DB810:DBX266:STRING'
test4: '%DB810:DBX526:STRING'
test5: '%DB810:DBX786:STRING'
- test6: '%DB810:DBX46806:STRING'
test7: '%DB810:DBD2:REAL'
- test8: '%DB811:DBX12:STRING'
- test9: '%DB811:DBX280:STRING'
- test10: '%DB811:DBB1000:BYTE[8]'
- test11: '%DB811:DBX268.3:BOOL'
- test12: '%DB811:DBB270:BYTE[8]'
\ No newline at end of file
+ test8: '%DB811:DBB1000:BYTE[8]'
+ test9: '%DB811:DBX268.3:BOOL'
+ test10: '%DB811:DBB270:BYTE[8]'
+
+ - name: scheduled-string-job3
+ triggerConfig: (SCHEDULED,2000)
+ sources:
+ - S7_PI
+ fields:
+ test3_8: '%DB811:DBX14:STRING'
+ test3_9: '%DB811:DBX282:STRING'
+
+
+ - name: triggered-demo-job3-prev_value
+ triggerConfig: (S7_TRIGGER_VAR,500,(%M0:USINT)>=(PREV))
+ sources:
+ - S7_PI
+ fields:
+ test1: '%DB810:DBW0:INT'
+
+ - name: triggered-demo-job4-combinded-condition
+ triggerConfig: (S7_TRIGGER_VAR,5,(%M0.1:BOOL)==(true)OR(%M0.2:BOOL)==(true))
+ sources:
+ - S7_PI
+ fields:
+ test1: '%M0:USINT'
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/resources/logback-test.xml b/plc4j/utils/scraper/src/test/resources/logback-test.xml
index c562020..3da2de8 100644
--- a/plc4j/utils/scraper/src/test/resources/logback-test.xml
+++ b/plc4j/utils/scraper/src/test/resources/logback-test.xml
@@ -29,8 +29,11 @@
</encoder>
</appender>
- <root level="INFO">
+ <root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
+ <logger name="org.apache.plc4x.java.scraper.triggeredscraper" level="WARN" additivity="false">
+ <appender-ref ref="STDOUT"/>
+ </logger>
</configuration>
\ No newline at end of file
diff --git a/plc4j/utils/scraper/src/test/resources/example_triggered_scraper.yml b/plc4j/utils/scraper/src/test/resources/mock-scraper-config.yml
similarity index 72%
copy from plc4j/utils/scraper/src/test/resources/example_triggered_scraper.yml
copy to plc4j/utils/scraper/src/test/resources/mock-scraper-config.yml
index b3c0c75..face235 100644
--- a/plc4j/utils/scraper/src/test/resources/example_triggered_scraper.yml
+++ b/plc4j/utils/scraper/src/test/resources/mock-scraper-config.yml
@@ -18,13 +18,23 @@
# ----------------------------------------------------------------------------
---
sources:
- S7_PI: s7://192.168.167.210/0/1
+ MOCK_1: mock:1
+ MOCK_2: mock:2
jobs:
- name: scheduled-demo-job1
triggerConfig: (SCHEDULED,100)
sources:
- - S7_PI
+ - MOCK_1
+ - MOCK_2
+ fields:
+ test1: '%DB810:DBB0:USINT'
+
+ - name: scheduled-demo-job2
+ triggerConfig: (SCHEDULED,100)
+ sources:
+ - MOCK_1
+ - MOCK_2
fields:
test1: '%DB810:DBB0:USINT'
@@ -32,7 +42,8 @@ jobs:
- name: triggered-demo-job1
triggerConfig: (S7_TRIGGER_VAR,10,(%M0.3:BOOL)==(true))
sources:
- - S7_PI
+ - MOCK_1
+ - MOCK_2
fields:
test1: '%DB810:DBW0:INT'
@@ -40,17 +51,7 @@ jobs:
- name: triggered-demo-job2
triggerConfig: (S7_TRIGGER_VAR,10,(%M0.7:BOOL)==(true))
sources:
- - S7_PI
+ - MOCK_1
+ - MOCK_2
fields:
- test1: '%DB810:DBW0:INT'
- test2: '%DB810:DBX6:STRING'
- test3: '%DB810:DBX266:STRING'
- test4: '%DB810:DBX526:STRING'
- test5: '%DB810:DBX786:STRING'
- test6: '%DB810:DBX46806:STRING'
- test7: '%DB810:DBD2:REAL'
- test8: '%DB811:DBX12:STRING'
- test9: '%DB811:DBX280:STRING'
- test10: '%DB811:DBB1000:BYTE[8]'
- test11: '%DB811:DBX268.3:BOOL'
- test12: '%DB811:DBB270:BYTE[8]'
\ No newline at end of file
+ test1: '%DB810:DBW0:INT'
\ No newline at end of file