You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/03/10 14:58:37 UTC

[plc4x] branch develop updated: - Added the analog input subscription functionality

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new e52774a  - Added the analog input subscription functionality
e52774a is described below

commit e52774ac236a12eacfd6003a992eed82019267dd
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Tue Mar 10 15:58:29 2020 +0100

    - Added the analog input subscription functionality
---
 .../plc4x/java/transport/serial/SerialChannel.java |   5 +-
 .../main/resources/protocols/firmata/firmata.mspec |   1 +
 sandbox/test-java-firmata-driver/pom.xml           |   9 +-
 .../java/firmata/readwrite/FirmataDriver.java      |  55 +++++++--
 .../readwrite/protocol/FirmataProtocolLogic.java   | 137 +++++++++++++--------
 5 files changed, 138 insertions(+), 69 deletions(-)

diff --git a/plc4j/transports/serial/src/main/java/org/apache/plc4x/java/transport/serial/SerialChannel.java b/plc4j/transports/serial/src/main/java/org/apache/plc4x/java/transport/serial/SerialChannel.java
index b9d5ee6..f49b0fd 100644
--- a/plc4j/transports/serial/src/main/java/org/apache/plc4x/java/transport/serial/SerialChannel.java
+++ b/plc4j/transports/serial/src/main/java/org/apache/plc4x/java/transport/serial/SerialChannel.java
@@ -272,11 +272,10 @@ public class SerialChannel extends AbstractNioByteChannel implements DuplexChann
             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
             allocHandle.reset(config);
 
-            ByteBuf byteBuf = null;
             boolean close = false;
             try {
                 do {
-                    byteBuf = allocHandle.allocate(allocator);
+                    ByteBuf byteBuf = allocHandle.allocate(allocator);
                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
                     if (allocHandle.lastBytesRead() <= 0) {
                         // nothing was read. release the buffer.
@@ -293,7 +292,6 @@ public class SerialChannel extends AbstractNioByteChannel implements DuplexChann
                     allocHandle.incMessagesRead(1);
                     readPending = false;
                     pipeline.fireChannelRead(byteBuf);
-                    byteBuf = null;
                 } while (allocHandle.continueReading());
 
                 allocHandle.readComplete();
@@ -306,6 +304,7 @@ public class SerialChannel extends AbstractNioByteChannel implements DuplexChann
             } catch (Throwable t) {
                 // TODO
                 // handleReadException(pipeline, byteBuf, t, close, allocHandle);
+                t.printStackTrace();
             } finally {
                 // Check if there is a readPending which was not processed yet.
                 // This could be for two reasons:
diff --git a/protocols/firmata/src/main/resources/protocols/firmata/firmata.mspec b/protocols/firmata/src/main/resources/protocols/firmata/firmata.mspec
index 41e087f..0a522c7 100644
--- a/protocols/firmata/src/main/resources/protocols/firmata/firmata.mspec
+++ b/protocols/firmata/src/main/resources/protocols/firmata/firmata.mspec
@@ -85,6 +85,7 @@
     [discriminator uint 8 'commandType']
     [typeSwitch 'commandType'
         ['0x00' SysexCommandExendedId
+            [array int 8 'id' count '2']
         ]
         ['0x69' SysexCommandAnalogMappingQuery
             [simple uint 8 'pin']
diff --git a/sandbox/test-java-firmata-driver/pom.xml b/sandbox/test-java-firmata-driver/pom.xml
index 466abfd..15c1f3a 100644
--- a/sandbox/test-java-firmata-driver/pom.xml
+++ b/sandbox/test-java-firmata-driver/pom.xml
@@ -84,6 +84,10 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>com.github.purejavacomm</groupId>
@@ -120,11 +124,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>commons-codec</groupId>
-      <artifactId>commons-codec</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-xml</artifactId>
       <scope>test</scope>
diff --git a/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/FirmataDriver.java b/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/FirmataDriver.java
index 8bf6f63..3fc69aa 100644
--- a/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/FirmataDriver.java
+++ b/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/FirmataDriver.java
@@ -19,6 +19,8 @@ under the License.
 package org.apache.plc4x.java.firmata.readwrite;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.plc4x.java.api.PlcDriver;
 import org.apache.plc4x.java.firmata.readwrite.configuration.FirmataConfiguration;
 import org.apache.plc4x.java.firmata.readwrite.context.FirmataDriverContext;
@@ -32,6 +34,9 @@ import org.apache.plc4x.java.spi.connection.ProtocolStackConfigurer;
 import org.apache.plc4x.java.spi.connection.SingleProtocolStackConfigurer;
 import org.osgi.service.component.annotations.Component;
 
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.ToIntFunction;
 
 @Component(service = PlcDriver.class, immediate = true)
@@ -78,6 +83,7 @@ public class FirmataDriver extends GeneratedDriverBase<FirmataMessage> {
             .withProtocol(FirmataProtocolLogic.class)
             .withDriverContext(FirmataDriverContext.class)
             .withPacketSizeEstimator(ByteLengthEstimator.class)
+            .withCorruptPacketRemover(CorruptPackageCleaner.class)
             .build();
     }
 
@@ -85,8 +91,10 @@ public class FirmataDriver extends GeneratedDriverBase<FirmataMessage> {
     public static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {
         @Override
         public int applyAsInt(ByteBuf byteBuf) {
+            ByteBuf tmp = Unpooled.buffer(1024);
             if (byteBuf.readableBytes() >= 1) {
                 int type = byteBuf.getByte(byteBuf.readerIndex()) & 0xF0;
+                tmp.writeByte(byteBuf.getByte(byteBuf.readerIndex()));
                 switch (type) {
                     case 0xE0:
                     case 0x90: return 3;
@@ -96,15 +104,23 @@ public class FirmataDriver extends GeneratedDriverBase<FirmataMessage> {
                         int commandType = byteBuf.getByte(byteBuf.readerIndex()) & 0x0F;
                         switch (commandType) {
                             case 0x00: {
-                                int curPos = 1;
-                                // As long as there are more bytes available and we haven't found the terminating char, continue ...
-                                while((byteBuf.readableBytes() > 1) && (byteBuf.getByte(byteBuf.readerIndex() + curPos) != (byte) 0xF7)) {
-                                    curPos++;
-                                }
-                                if(byteBuf.getByte(byteBuf.readerIndex() + curPos) == (byte) 0xF7) {
-                                    return curPos + 1;
-                                } else {
-                                    return -1;
+                                try {
+                                    int curPos = 1;
+                                    // As long as there are more bytes available and we haven't found the terminating char, continue ...
+                                    while ((byteBuf.readableBytes() > curPos + 1) && (byteBuf.getByte(byteBuf.readerIndex() + curPos) != (byte) 0xF7)) {
+                                        tmp.writeByte(byteBuf.getByte(byteBuf.readerIndex() + curPos));
+                                        curPos++;
+                                    }
+                                    if (byteBuf.getByte(byteBuf.readerIndex() + curPos) == (byte) 0xF7) {
+                                        tmp.writeByte(byteBuf.getByte(byteBuf.readerIndex() + curPos));
+                                        return curPos + 1;
+                                    } else {
+                                        return -1;
+                                    }
+                                } catch(Exception e) {
+                                    byte[] data = new byte[tmp.readableBytes()];
+                                    tmp.readBytes(data);
+                                    System.out.println("Error processing: " + Hex.encodeHexString(data));
                                 }
                             }
                             case 0x04:
@@ -122,4 +138,25 @@ public class FirmataDriver extends GeneratedDriverBase<FirmataMessage> {
         }
     }
 
+    /** Consumes all Bytes till one of the potential message type indicators */
+    public static class CorruptPackageCleaner implements Consumer<ByteBuf> {
+
+        static Set<Byte> commands = new HashSet<>();
+        {
+            commands.add((byte) 0xE0);
+            commands.add((byte) 0x90);
+            commands.add((byte) 0xC0);
+            commands.add((byte) 0xD0);
+            commands.add((byte) 0xF0);
+        }
+
+        @Override
+        public void accept(ByteBuf byteBuf) {
+            while (!commands.contains((byte) (byteBuf.getUnsignedByte(0) & 0xF0))) {
+                // Just consume the bytes till the next possible start position.
+                byteBuf.readByte();
+            }
+        }
+    }
+
 }
diff --git a/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/protocol/FirmataProtocolLogic.java b/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/protocol/FirmataProtocolLogic.java
index 4b3dfed..51d295c 100644
--- a/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/protocol/FirmataProtocolLogic.java
+++ b/sandbox/test-java-firmata-driver/src/main/java/org/apache/plc4x/java/firmata/readwrite/protocol/FirmataProtocolLogic.java
@@ -26,10 +26,13 @@ import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
 import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.java.api.value.PlcBoolean;
+import org.apache.plc4x.java.api.value.PlcInteger;
 import org.apache.plc4x.java.api.value.PlcList;
+import org.apache.plc4x.java.api.value.PlcValue;
 import org.apache.plc4x.java.firmata.readwrite.*;
 import org.apache.plc4x.java.firmata.readwrite.context.FirmataDriverContext;
 import org.apache.plc4x.java.firmata.readwrite.field.FirmataField;
+import org.apache.plc4x.java.firmata.readwrite.field.FirmataFieldAnalog;
 import org.apache.plc4x.java.firmata.readwrite.field.FirmataFieldDigital;
 import org.apache.plc4x.java.firmata.readwrite.model.FirmataSubscriptionHandle;
 import org.apache.plc4x.java.spi.ConversationContext;
@@ -117,44 +120,35 @@ public class FirmataProtocolLogic extends Plc4xProtocolBase<FirmataMessage> impl
 
     @Override
     protected void decode(ConversationContext<FirmataMessage> context, FirmataMessage msg) throws Exception {
-        if(msg instanceof FirmataMessageCommand) {
-            // Ignore ... for now ...
-        } else {
-            if(msg instanceof FirmataMessageAnalogIO) {
-                // Analog values are single value messages (Value for one port only)
-                FirmataMessageAnalogIO analogIO = (FirmataMessageAnalogIO) msg;
-                int pin = analogIO.getPin();
-                int analogValue = getAnalogValue(analogIO.getData());
-                // If this is the first value, just add it.
-                if(analogValues.get(pin) == null) {
-                    analogValues.put(pin, new AtomicInteger(analogValue));
-                    // TODO: Send an changed event ...
-                }
-                // If there was a value before and this is different to the current one, update it.
-                else if(analogValue != analogValues.get(pin).intValue()) {
-                    analogValues.get(pin).set(analogValue);
-                    // TODO: Send an changed event ...
-                }
-            } else if(msg instanceof FirmataMessageDigitalIO) {
-                // Digital values come 8 pins together (ignoring the pin value, which is always 0).
-                FirmataMessageDigitalIO digitalIO = (FirmataMessageDigitalIO) msg;
-                BitSet newDigitalValues = getDigitalValues(digitalIO.getPinBlock(), digitalIO.getData());
-
-                // Compare the currently set bits with the ones from the last time to see what's changed.
-                BitSet changedBits = new BitSet();
-                for (int i = 0; i < 8; i++) {
-                    int bitPos = i + (8 * digitalIO.getPinBlock());
-                    if (digitalValues.get(bitPos) != newDigitalValues.get(bitPos)) {
-                        changedBits.set(bitPos, true);
-                        digitalValues.set(bitPos, newDigitalValues.get(bitPos));
-                    }
-                }
+        if(msg instanceof FirmataMessageAnalogIO) {
+            // Analog values are single value messages (Value for one port only)
+            FirmataMessageAnalogIO analogIO = (FirmataMessageAnalogIO) msg;
+            int pin = analogIO.getPin();
+            int analogValue = getAnalogValue(analogIO.getData());
+            // If this is the first value, or the value changed, send update events..
+            if((analogValues.get(pin) == null) || (analogValue != analogValues.get(pin).intValue())) {
+                analogValues.put(pin, new AtomicInteger(analogValue));
+                publishAnalogEvents(pin, analogValue);
+            }
+        } else if(msg instanceof FirmataMessageDigitalIO) {
+            // Digital values come 8 pins together (ignoring the pin value, which is always 0).
+            FirmataMessageDigitalIO digitalIO = (FirmataMessageDigitalIO) msg;
+            BitSet newDigitalValues = getDigitalValues(digitalIO.getPinBlock(), digitalIO.getData());
 
-                // Send out update events.
-                publishDigitalEvents(changedBits, digitalValues);
-            } else {
-                LOGGER.debug(String.format("Unexpected message %s", msg.toString()));
+            // Compare the currently set bits with the ones from the last time to see what's changed.
+            BitSet changedBits = new BitSet();
+            for (int i = 0; i < 8; i++) {
+                int bitPos = i + (8 * digitalIO.getPinBlock());
+                if (digitalValues.get(bitPos) != newDigitalValues.get(bitPos)) {
+                    changedBits.set(bitPos, true);
+                    digitalValues.set(bitPos, newDigitalValues.get(bitPos));
+                }
             }
+
+            // Send out update events.
+            publishDigitalEvents(changedBits, digitalValues);
+        } else {
+            LOGGER.debug(String.format("Unexpected message %s", msg.toString()));
         }
     }
 
@@ -177,6 +171,41 @@ public class FirmataProtocolLogic extends Plc4xProtocolBase<FirmataMessage> impl
         consumers.remove(consumerRegistration);
     }
 
+    protected void publishAnalogEvents(int pin, int value) {
+        // Try sending the subscription event to all listeners.
+        for (Map.Entry<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> entry : consumers.entrySet()) {
+            final DefaultPlcConsumerRegistration registration = entry.getKey();
+            final Consumer<PlcSubscriptionEvent> consumer = entry.getValue();
+            // Only if the current data point matches the subscription, publish the event to it.
+            for (InternalPlcSubscriptionHandle handle : registration.getAssociatedHandles()) {
+                if (handle instanceof FirmataSubscriptionHandle) {
+                    FirmataSubscriptionHandle subscriptionHandle = (FirmataSubscriptionHandle) handle;
+                    // Check if the subscription matches this current event
+                    // (The bit subscribed to in this field actually changed).
+                    if (subscriptionHandle.getField() instanceof FirmataFieldAnalog) {
+                        FirmataFieldAnalog analogField = (FirmataFieldAnalog) subscriptionHandle.getField();
+                        // Check if this field would include the current pin.
+                        if((analogField.getAddress() <= pin) &&
+                            (analogField.getAddress() + analogField.getQuantity() >= pin)) {
+                            // Build an update event containing the current values for all subscribed fields.
+                            List<PlcValue> values = new ArrayList<>(analogField.getQuantity());
+                            for(int i = analogField.getAddress(); i < analogField.getAddress() + analogField.getQuantity(); i++) {
+                                if(analogValues.containsKey(i)) {
+                                    values.add(new PlcInteger(analogValues.get(i).intValue()));
+                                }
+                                // This could be the case if only some of the requested array values are available
+                                else {
+                                    values.add(new PlcInteger(-1));
+                                }
+                            }
+                            sendUpdateEvents(consumer, subscriptionHandle.getName(), values);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     protected void publishDigitalEvents(BitSet changedBits, BitSet bitValues) {
         // If nothing changed, no need to do anything.
         if(changedBits.cardinality() == 0) {
@@ -197,24 +226,11 @@ public class FirmataProtocolLogic extends Plc4xProtocolBase<FirmataMessage> impl
                         // If at least one bit of the current subscription changed it's value,
                         // send out an update event with all of its current values.
                         if(digitalField.getBitSet().intersects(changedBits)) {
-                            List<PlcBoolean> values = new ArrayList<>(digitalField.getBitSet().cardinality());
+                            List<PlcValue> values = new ArrayList<>(digitalField.getBitSet().cardinality());
                             for(int i = 0; i < digitalField.getBitSet().length(); i++) {
                                 values.add(new PlcBoolean(bitValues.get(i)));
                             }
-                            // If it's just one element, return this as a direct PlcValue
-                            if(values.size() == 1) {
-                                final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
-                                    Collections.singletonMap(subscriptionHandle.getName(),
-                                        Pair.of(PlcResponseCode.OK, values.get(0))));
-                                consumer.accept(event);
-                            }
-                            // If it's more, return a PlcList instead.
-                            else {
-                                final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
-                                    Collections.singletonMap(subscriptionHandle.getName(),
-                                        Pair.of(PlcResponseCode.OK, new PlcList(values))));
-                                consumer.accept(event);
-                            }
+                            sendUpdateEvents(consumer, subscriptionHandle.getName(), values);
                         }
                     }
                 }
@@ -222,8 +238,25 @@ public class FirmataProtocolLogic extends Plc4xProtocolBase<FirmataMessage> impl
         }
     }
 
+    protected void sendUpdateEvents(Consumer<PlcSubscriptionEvent> consumer, String fieldName, List<PlcValue> values) {
+        // If it's just one element, return this as a direct PlcValue
+        if(values.size() == 1) {
+            final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
+                Collections.singletonMap(fieldName, Pair.of(PlcResponseCode.OK, values.get(0))));
+            consumer.accept(event);
+        }
+        // If it's more, return a PlcList instead.
+        else {
+            final PlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(),
+                Collections.singletonMap(fieldName, Pair.of(PlcResponseCode.OK, new PlcList(values))));
+            consumer.accept(event);
+        }
+    }
+
     protected int getAnalogValue(byte[] data) {
-        return 0;
+        // In Firmata analog values are encoded as a 14bit integer with the least significant bits being located in
+        // the bits 0-6 of the birst byte and the second half as the 0-6 bits of the second byte.
+        return (data[0] | (data[1] << 7)) & 0xFFFF;
     }
 
     protected int convertToSingleByteRepresentation(byte[] data) {