You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/03/10 14:58:37 UTC
[plc4x] branch develop updated: - Added the analog input
subscription functionality
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new e52774a - Added the analog input subscription functionality
e52774a is described below
commit e52774ac236a12eacfd6003a992eed82019267dd
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Mar 10 15:58:29 2020 +0100
- Added the analog input subscription functionality
---
.../plc4x/java/transport/serial/SerialChannel.java | 5 +-
.../main/resources/protocols/firmata/firmata.mspec | 1 +
sandbox/test-java-firmata-driver/pom.xml | 9 +-
.../java/firmata/readwrite/FirmataDriver.java | 55 +++++++--
.../readwrite/protocol/FirmataProtocolLogic.java | 137 +++++++++++++--------
5 files changed, 138 insertions(+), 69 deletions(-)
diff --git a/plc4j/transports/serial/src/main/java/org/apache/plc4x/java/transport/serial/SerialChannel.java b/plc4j/transports/serial/src/main/java/org/apache/plc4x/java/transport/serial/SerialChannel.java
index b9d5ee6..f49b0fd 100644
--- a/plc4j/transports/serial/src/main/java/org/apache/plc4x/java/transport/serial/SerialChannel.java
+++ b/plc4j/transports/serial/src/main/java/org/apache/plc4x/java/transport/serial/SerialChannel.java
@@ -272,11 +272,10 @@ public class SerialChannel extends AbstractNioByteChannel implements DuplexChann
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
- ByteBuf byteBuf = null;
boolean close = false;
try {
do {
- byteBuf = allocHandle.allocate(allocator);
+ ByteBuf byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
@@ -293,7 +292,6 @@ public class SerialChannel extends AbstractNioByteChannel implements DuplexChann
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
- byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
@@ -306,6 +304,7 @@ public class SerialChannel extends AbstractNioByteChannel implements DuplexChann
} catch (Throwable t) {
// TODO
// handleReadException(pipeline, byteBuf, t, close, allocHandle);
+ t.printStackTrace();
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
diff --git a/protocols/firmata/src/main/resources/protocols/firmata/firmata.mspec b/protocols/firmata/src/main/resources/protocols/firmata/firmata.mspec
index 41e087f..0a522c7 100644
--- a/protocols/firmata/src/main/resources/protocols/firmata/firmata.mspec
+++ b/protocols/firmata/src/main/resources/protocols/firmata/firmata.mspec
@@ -85,6 +85,7 @@
[discriminator uint 8 'commandType']
[typeSwitch 'commandType'
['0x00' SysexCommandExendedId
+ [array int 8 'id' count '2']
]
['0x69' SysexCommandAnalogMappingQuery
[simple uint 8 'pin']
diff --git a/sandbox/test-java-firmata-driver/pom.xml b/sandbox/test-java-firmata-driver/pom.xml
index 466abfd..15c1f3a 100644
--- a/sandbox/test-java-firmata-driver/pom.xml
+++ b/sandbox/test-java-firmata-driver/pom.xml
@@ -84,6 +84,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </dependency>
<dependency>
<groupId>com.github.purejavacomm</groupId>
@@ -120,11 +124,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<scope>test</scope>
diff --git a/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/FirmataDriver.java b/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/FirmataDriver.java
index 8bf6f63..3fc69aa 100644
--- a/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/FirmataDriver.java
+++ b/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/FirmataDriver.java
@@ -19,6 +19,8 @@ under the License.
package org.apache.plc4x.java.firmata.readwrite;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.commons.codec.binary.Hex;
import org.apache.plc4x.java.api.PlcDriver;
import org.apache.plc4x.java.firmata.readwrite.configuration.FirmataConfiguration;
import org.apache.plc4x.java.firmata.readwrite.context.FirmataDriverContext;
@@ -32,6 +34,9 @@ import org.apache.plc4x.java.spi.connection.ProtocolStackConfigurer;
import org.apache.plc4x.java.spi.connection.SingleProtocolStackConfigurer;
import org.osgi.service.component.annotations.Component;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
import java.util.function.ToIntFunction;
@Component(service = PlcDriver.class, immediate = true)
@@ -78,6 +83,7 @@ public class FirmataDriver extends GeneratedDriverBase<FirmataMessage> {
.withProtocol(FirmataProtocolLogic.class)
.withDriverContext(FirmataDriverContext.class)
.withPacketSizeEstimator(ByteLengthEstimator.class)
+ .withCorruptPacketRemover(CorruptPackageCleaner.class)
.build();
}
@@ -85,8 +91,10 @@ public class FirmataDriver extends GeneratedDriverBase<FirmataMessage> {
public static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {
@Override
public int applyAsInt(ByteBuf byteBuf) {
+ ByteBuf tmp = Unpooled.buffer(1024);
if (byteBuf.readableBytes() >= 1) {
int type = byteBuf.getByte(byteBuf.readerIndex()) & 0xF0;
+ tmp.writeByte(byteBuf.getByte(byteBuf.readerIndex()));
switch (type) {
case 0xE0:
case 0x90: return 3;
@@ -96,15 +104,23 @@ public class FirmataDriver extends GeneratedDriverBase<FirmataMessage> {
int commandType = byteBuf.getByte(byteBuf.readerIndex()) & 0x0F;
switch (commandType) {
case 0x00: {
- int curPos = 1;
- // As long as there are more bytes available and we haven't found the terminating char, continue ...
- while((byteBuf.readableBytes() > 1) && (byteBuf.getByte(byteBuf.readerIndex() + curPos) != (byte) 0xF7)) {
- curPos++;
- }
- if(byteBuf.getByte(byteBuf.readerIndex() + curPos) == (byte) 0xF7) {
- return curPos + 1;
- } else {
- return -1;
+ try {
+ int curPos = 1;
+ // As long as there are more bytes available and we haven't found the terminating char, continue ...
+ while ((byteBuf.readableBytes() > curPos + 1) && (byteBuf.getByte(byteBuf.readerIndex() + curPos) != (byte) 0xF7)) {
+ tmp.writeByte(byteBuf.getByte(byteBuf.readerIndex() + curPos));
+ curPos++;
+ }
+ if (byteBuf.getByte(byteBuf.readerIndex() + curPos) == (byte) 0xF7) {
+ tmp.writeByte(byteBuf.getByte(byteBuf.readerIndex() + curPos));
+ return curPos + 1;
+ } else {
+ return -1;
+ }
+ } catch(Exception e) {
+ byte[] data = new byte[tmp.readableBytes()];
+ tmp.readBytes(data);
+ System.out.println("Error processing: " + Hex.encodeHexString(data));
}
}
case 0x04:
@@ -122,4 +138,25 @@ public class FirmataDriver extends GeneratedDriverBase<FirmataMessage> {
}
}
+ /** Consumes all Bytes till one of the potential message type indicators */
+ public static class CorruptPackageCleaner implements Consumer<ByteBuf> {
+
+ static Set<Byte> commands = new HashSet<>();
+ {
+ commands.add((byte) 0xE0);
+ commands.add((byte) 0x90);
+ commands.add((byte) 0xC0);
+ commands.add((byte) 0xD0);
+ commands.add((byte) 0xF0);
+ }
+
+ @Override
+ public void accept(ByteBuf byteBuf) {
+ while (!commands.contains((byte) (byteBuf.getUnsignedByte(0) & 0xF0))) {
+ // Just consume the bytes till the next possible start position.
+ byteBuf.readByte();
+ }
+ }
+ }
+
}
diff --git a/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/protocol/FirmataProtocolLogic.java b/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/protocol/FirmataProtocolLogic.java
index 4b3dfed..51d295c 100644
--- a/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/protocol/FirmataProtocolLogic.java
+++ b/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/protocol/FirmataProtocolLogic.java
@@ -26,10 +26,13 @@ import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.value.PlcBoolean;
+import org.apache.plc4x.java.api.value.PlcInteger;
import org.apache.plc4x.java.api.value.PlcList;
+import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.firmata.readwrite.*;
import org.apache.plc4x.java.firmata.readwrite.context.FirmataDriverContext;
import org.apache.plc4x.java.firmata.readwrite.field.FirmataField;
+import org.apache.plc4x.java.firmata.readwrite.field.FirmataFieldAnalog;
import org.apache.plc4x.java.firmata.readwrite.field.FirmataFieldDigital;
import org.apache.plc4x.java.firmata.readwrite.model.FirmataSubscriptionHandle;
import org.apache.plc4x.java.spi.ConversationContext;
@@ -117,44 +120,35 @@ public class FirmataProtocolLogic extends Plc4xProtocolBase<FirmataMessage> impl
@Override
protected void decode(ConversationContext<FirmataMessage> context, FirmataMessage msg) throws Exception {
- if(msg instanceof FirmataMessageCommand) {
- // Ignore ... for now ...
- } else {
- if(msg instanceof FirmataMessageAnalogIO) {
- // Analog values are single value messages (Value for one port only)
- FirmataMessageAnalogIO analogIO = (FirmataMessageAnalogIO) msg;
- int pin = analogIO.getPin();
- int analogValue = getAnalogValue(analogIO.getData());
- // If this is the first value, just add it.
- if(analogValues.get(pin) == null) {
- analogValues.put(pin, new AtomicInteger(analogValue));
- // TODO: Send an changed event ...
- }
- // If there was a value before and this is different to the current one, update it.
- else if(analogValue != analogValues.get(pin).intValue()) {
- analogValues.get(pin).set(analogValue);
- // TODO: Send an changed event ...
- }
- } else if(msg instanceof FirmataMessageDigitalIO) {
- // Digital values come 8 pins together (ignoring the pin value, which is always 0).
- FirmataMessageDigitalIO digitalIO = (FirmataMessageDigitalIO) msg;
- BitSet newDigitalValues = getDigitalValues(digitalIO.getPinBlock(), digitalIO.getData());
-
- // Compare the currently set bits with the ones from the last time to see what's changed.
- BitSet changedBits = new BitSet();
- for (int i = 0; i < 8; i++) {
- int bitPos = i + (8 * digitalIO.getPinBlock());
- if (digitalValues.get(bitPos) != newDigitalValues.get(bitPos)) {
- changedBits.set(bitPos, true);
- digitalValues.set(bitPos, newDigitalValues.get(bitPos));
- }
- }
+ if(msg instanceof FirmataMessageAnalogIO) {
+ // Analog values are single value messages (Value for one port only)
+ FirmataMessageAnalogIO analogIO = (FirmataMessageAnalogIO) msg;
+ int pin = analogIO.getPin();
+ int analogValue = getAnalogValue(analogIO.getData());
+ // If this is the first value, or the value changed, send update events..
+ if((analogValues.get(pin) == null) || (analogValue != analogValues.get(pin).intValue())) {
+ analogValues.put(pin, new AtomicInteger(analogValue));
+ publishAnalogEvents(pin, analogValue);
+ }
+ } else if(msg instanceof FirmataMessageDigitalIO) {
+ // Digital values come 8 pins together (ignoring the pin value, which is always 0).
+ FirmataMessageDigitalIO digitalIO = (FirmataMessageDigitalIO) msg;
+ BitSet newDigitalValues = getDigitalValues(digitalIO.getPinBlock(), digitalIO.getData());
- // Send out update events.
- publishDigitalEvents(changedBits, digitalValues);
- } else {
- LOGGER.debug(String.format("Unexpected message %s", msg.toString()));
+ // Compare the currently set bits with the ones from the last time to see what's changed.
+ BitSet changedBits = new BitSet();
+ for (int i = 0; i < 8; i++) {
+ int bitPos = i + (8 * digitalIO.getPinBlock());
+ if (digitalValues.get(bitPos) != newDigitalValues.get(bitPos)) {
+ changedBits.set(bitPos, true);
+ digitalValues.set(bitPos, newDigitalValues.get(bitPos));
+ }
}
+
+ // Send out update events.
+ publishDigitalEvents(changedBits, digitalValues);
+ } else {
+ LOGGER.debug(String.format("Unexpected message %s", msg.toString()));
}
}
@@ -177,6 +171,41 @@ public class FirmataProtocolLogic extends Plc4xProtocolBase<FirmataMessage> impl
consumers.remove(consumerRegistration);
}
+ protected void publishAnalogEvents(int pin, int value) {
+ // Try sending the subscription event to all listeners.
+ for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
+ final DefaultPlcConsumerRegistration registration = entry.getKey();
+ final Consumer<PlcSubscriptionEvent> consumer = entry.getValue();
+ // Only if the current data point matches the subscription, publish the event to it.
+ for (InternalPlcSubscriptionHandle handle : registration.getAssociatedHandles()) {
+ if (handle instanceof FirmataSubscriptionHandle) {
+ FirmataSubscriptionHandle subscriptionHandle = (FirmataSubscriptionHandle) handle;
+ // Check if the subscription matches this current event
+ // (The bit subscribed to in this field actually changed).
+ if (subscriptionHandle.getField() instanceof FirmataFieldAnalog) {
+ FirmataFieldAnalog analogField = (FirmataFieldAnalog) subscriptionHandle.getField();
+ // Check if this field would include the current pin.
+ if((analogField.getAddress() <= pin) &&
+ (analogField.getAddress() + analogField.getQuantity() >= pin)) {
+ // Build an update event containing the current values for all subscribed fields.
+ List<PlcValue> values = new ArrayList<>(analogField.getQuantity());
+ for(int i = analogField.getAddress(); i < analogField.getAddress() + analogField.getQuantity(); i++) {
+ if(analogValues.containsKey(i)) {
+ values.add(new PlcInteger(analogValues.get(i).intValue()));
+ }
+ // This could be the case if only some of the requested array values are available
+ else {
+ values.add(new PlcInteger(-1));
+ }
+ }
+ sendUpdateEvents(consumer, subscriptionHandle.getName(), values);
+ }
+ }
+ }
+ }
+ }
+ }
+
protected void publishDigitalEvents(BitSet changedBits, BitSet bitValues) {
// If nothing changed, no need to do anything.
if(changedBits.cardinality() == 0) {
@@ -197,24 +226,11 @@ public class FirmataProtocolLogic extends Plc4xProtocolBase<FirmataMessage> impl
// If at least one bit of the current subscription changed it's value,
// send out an update event with all of its current values.
if(digitalField.getBitSet().intersects(changedBits)) {
- List<PlcBoolean> values = new ArrayList<>(digitalField.getBitSet().cardinality());
+ List<PlcValue> values = new ArrayList<>(digitalField.getBitSet().cardinality());
for(int i = 0; i < digitalField.getBitSet().length(); i++) {
values.add(new PlcBoolean(bitValues.get(i)));
}
- // If it's just one element, return this as a direct PlcValue
- if(values.size() == 1) {
- final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
- Collections.singletonMap(subscriptionHandle.getName(),
- Pair.of(PlcResponseCode.OK, values.get(0))));
- consumer.accept(event);
- }
- // If it's more, return a PlcList instead.
- else {
- final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
- Collections.singletonMap(subscriptionHandle.getName(),
- Pair.of(PlcResponseCode.OK, new PlcList(values))));
- consumer.accept(event);
- }
+ sendUpdateEvents(consumer, subscriptionHandle.getName(), values);
}
}
}
@@ -222,8 +238,25 @@ public class FirmataProtocolLogic extends Plc4xProtocolBase<FirmataMessage> impl
}
}
+ protected void sendUpdateEvents(Consumer<PlcSubscriptionEvent> consumer, String fieldName, List<PlcValue> values) {
+ // If it's just one element, return this as a direct PlcValue
+ if(values.size() == 1) {
+ final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
+ Collections.singletonMap(fieldName, Pair.of(PlcResponseCode.OK, values.get(0))));
+ consumer.accept(event);
+ }
+ // If it's more, return a PlcList instead.
+ else {
+ final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
+ Collections.singletonMap(fieldName, Pair.of(PlcResponseCode.OK, new PlcList(values))));
+ consumer.accept(event);
+ }
+ }
+
protected int getAnalogValue(byte[] data) {
- return 0;
+ // In Firmata analog values are encoded as a 14bit integer with the least significant bits being located in
+ // the bits 0-6 of the birst byte and the second half as the 0-6 bits of the second byte.
+ return (data[0] | (data[1] << 7)) & 0xFFFF;
}
protected int convertToSingleByteRepresentation(byte[] data) {