You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2021/01/20 10:43:33 UTC

[plc4x] branch add_encryption_handler_opcua updated: Finialized OPCUA server and updated Kafka connector

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch add_encryption_handler_opcua
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/add_encryption_handler_opcua by this push:
     new 184fafb  Finialized OPCUA server and updated Kafka connector
184fafb is described below

commit 184fafbd365c3d219cb718312995b736d1c6f548
Author: hutcheb <be...@gmail.com>
AuthorDate: Wed Jan 20 05:43:13 2021 -0500

    Finialized OPCUA server and updated Kafka connector
    
    Fixed opcua server issue when polling errors occur. Now polling doesn't
    stop for other tags.
    
    Updated kafka conector based on Rankesh's comment about adding jitter to
    the poll return time.
---
 plc4j/integrations/apache-kafka/pom.xml            |   5 +
 .../org/apache/plc4x/kafka/Plc4xSinkConnector.java |   1 -
 .../java/org/apache/plc4x/kafka/Plc4xSinkTask.java |   2 -
 .../apache/plc4x/kafka/Plc4xSourceConnector.java   |   3 -
 .../org/apache/plc4x/kafka/Plc4xSourceTask.java    |   5 +-
 .../opcuaserver/backend/Plc4xCommunication.java    | 170 ++++++++++++---------
 .../java/opcuaserver/backend/Plc4xNamespace.java   |   2 +-
 7 files changed, 108 insertions(+), 80 deletions(-)

diff --git a/plc4j/integrations/apache-kafka/pom.xml b/plc4j/integrations/apache-kafka/pom.xml
index 7dc6405..d836c49 100644
--- a/plc4j/integrations/apache-kafka/pom.xml
+++ b/plc4j/integrations/apache-kafka/pom.xml
@@ -266,6 +266,11 @@
       <scope>runtime</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+
     <!--dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>connect-runtime</artifactId>
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 0cca336..50e6f51 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -18,7 +18,6 @@ under the License.
 */
 package org.apache.plc4x.kafka;
 
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index 5d283aa..789ce9d 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -34,8 +34,6 @@ import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 import org.apache.plc4x.kafka.config.Constants;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.concurrent.ExecutionException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index c981c18..6088a8a 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -20,12 +20,9 @@ package org.apache.plc4x.kafka;
 
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.kafka.config.Field;
 import org.apache.plc4x.kafka.config.*;
 import org.apache.plc4x.kafka.util.VersionUtil;
diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index 32486fb..00409b8 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -18,6 +18,7 @@ under the License.
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.data.*;
@@ -43,8 +44,6 @@ import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.TimeUnit;
@@ -243,7 +242,7 @@ public class Plc4xSourceTask extends SourceTask {
         } else {
             try {
                 List<SourceRecord> result = new ArrayList<>(1);
-                SourceRecord temp = buffer.poll(pollReturnInterval, TimeUnit.MILLISECONDS);
+                SourceRecord temp = buffer.poll(pollReturnInterval + RandomUtils.nextInt(0, (int) Math.round(pollReturnInterval*0.05)), TimeUnit.MILLISECONDS);
                 if (temp == null) {
                     return null;
                 }
diff --git a/plc4j/integrations/opcua-server/src/main/java/org/apache/plc4x/java/opcuaserver/backend/Plc4xCommunication.java b/plc4j/integrations/opcua-server/src/main/java/org/apache/plc4x/java/opcuaserver/backend/Plc4xCommunication.java
index 1c717c8..6db9aeb 100644
--- a/plc4j/integrations/opcua-server/src/main/java/org/apache/plc4x/java/opcuaserver/backend/Plc4xCommunication.java
+++ b/plc4j/integrations/opcua-server/src/main/java/org/apache/plc4x/java/opcuaserver/backend/Plc4xCommunication.java
@@ -21,6 +21,8 @@ package org.apache.plc4x.java.opcuaserver.backend;
 
 import java.lang.reflect.Array;
 import java.util.Arrays;
+
+import org.eclipse.milo.opcua.sdk.server.AbstractLifecycle;
 import org.eclipse.milo.opcua.sdk.server.api.DataItem;
 import org.eclipse.milo.opcua.sdk.server.nodes.filters.AttributeFilterContext;
 import org.eclipse.milo.opcua.stack.core.Identifiers;
@@ -29,6 +31,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
 import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
 import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
 
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.ULong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +59,7 @@ import java.math.BigInteger;
 import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.ulong;
 
 
-public class Plc4xCommunication {
+public class Plc4xCommunication extends AbstractLifecycle {
 
     private PlcDriverManager driverManager;
     private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -68,9 +71,19 @@ public class Plc4xCommunication {
     Map<NodeId, DataItem> monitoredList = new HashMap<>();
 
     public Plc4xCommunication () {
+
+    }
+
+    @Override
+    protected void onStartup() {
         driverManager = new PooledPlcDriverManager();
     }
 
+    @Override
+    protected void onShutdown() {
+
+    }
+
     public PlcDriverManager getDriverManager() {
         return driverManager;
     }
@@ -156,93 +169,110 @@ public class Plc4xCommunication {
     }
 
     public DataValue getValue(AttributeFilterContext.GetAttributeContext ctx, String tag, String connectionString) {
-        PlcConnection connection = null;
         DataValue resp = new DataValue(new Variant(null), StatusCode.BAD);
+        PlcConnection connection = null;
+        try {
 
-        //Check if we just polled the connection and it failed. Wait for the backoff counter to expire before we try again.
-        if (failedConnectionList.containsKey(connectionString)) {
-            if (System.currentTimeMillis() > failedConnectionList.get(connectionString) + DEFAULT_RETRY_BACKOFF) {
-                failedConnectionList.remove(connectionString);
-            } else {
-                logger.debug("Waiting for back off timer - " + ((failedConnectionList.get(connectionString) + DEFAULT_RETRY_BACKOFF) - System.currentTimeMillis()) + " ms left");
-                return resp;
+            //Check if we just polled the connection and it failed. Wait for the backoff counter to expire before we try again.
+            if (failedConnectionList.containsKey(connectionString)) {
+                if (System.currentTimeMillis() > failedConnectionList.get(connectionString) + DEFAULT_RETRY_BACKOFF) {
+                    failedConnectionList.remove(connectionString);
+                } else {
+                    logger.debug("Waiting for back off timer - " + ((failedConnectionList.get(connectionString) + DEFAULT_RETRY_BACKOFF) - System.currentTimeMillis()) + " ms left");
+                    return resp;
+                }
             }
-        }
 
-        //Try to connect to PLC
-        try {
-            connection = driverManager.getConnection(connectionString);            
-            logger.debug(connectionString + " Connected");
-        } catch (PlcConnectionException e) {
-            logger.error("Failed to connect to device, error raised - " + e);
-            failedConnectionList.put(connectionString, System.currentTimeMillis());
-            return resp;
-        }
-
-        if (!connection.getMetadata().canRead()) {
-            logger.error("This connection doesn't support reading.");
+            //Try to connect to PLC
             try {
-                connection.close();
-            } catch (Exception exception) {
-                logger.warn("Closing connection failed with error - " + exception);
+                connection = driverManager.getConnection(connectionString);
+                logger.debug(connectionString + " Connected");
+            } catch (PlcConnectionException e) {
+                logger.error("Failed to connect to device, error raised - " + e);
+                failedConnectionList.put(connectionString, System.currentTimeMillis());
+                return resp;
             }
-            return resp;
-        }
 
-        long timeout = DEFAULT_TIMEOUT;
-        if (monitoredList.containsKey(ctx.getNode().getNodeId())) {
-            timeout = (long) monitoredList.get(ctx.getNode().getNodeId()).getSamplingInterval()*1000;
-        }
+            if (!connection.getMetadata().canRead()) {
+                logger.error("This connection doesn't support reading.");
+                try {
+                    connection.close();
+                } catch (Exception exception) {
+                    logger.warn("Closing connection failed with error - " + exception);
+                }
+                return resp;
+            }
 
-        // Create a new read request:
-        // - Give the single item requested an alias name
-        PlcReadRequest.Builder builder = connection.readRequestBuilder();
-        builder.addItem("value-1", tag);
-        PlcReadRequest readRequest = builder.build();
+            long timeout = DEFAULT_TIMEOUT;
+            if (monitoredList.containsKey(ctx.getNode().getNodeId())) {
+                timeout = (long) monitoredList.get(ctx.getNode().getNodeId()).getSamplingInterval() * 1000;
+            }
 
-        PlcReadResponse response = null;
-        try {
-            response = readRequest.execute().get(timeout, TimeUnit.MICROSECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            logger.warn(e + " Occurred while reading value, using timeout of " + timeout/1000 + "ms");
+            // Create a new read request:
+            // - Give the single item requested an alias name
+            PlcReadRequest.Builder builder = connection.readRequestBuilder();
+            builder.addItem("value-1", tag);
+            PlcReadRequest readRequest = builder.build();
+
+            PlcReadResponse response = null;
             try {
-                connection.close();
-            } catch (Exception exception) {
-                logger.warn("Closing connection failed with error - " + exception);
+                response = readRequest.execute().get(timeout, TimeUnit.MICROSECONDS);
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                logger.warn(e + " Occurred while reading value, using timeout of " + timeout / 1000 + "ms");
+                try {
+                    connection.close();
+                } catch (Exception exception) {
+                    logger.warn("Closing connection failed with error - " + exception);
+                }
+                return resp;
             }
-            return resp;
-        }
 
-        for (String fieldName : response.getFieldNames()) {
-          if(response.getResponseCode(fieldName) == PlcResponseCode.OK) {
-              int numValues = response.getNumberOfValues(fieldName);
-              if(numValues == 1) {
-                  if (response.getObject(fieldName) instanceof BigInteger) {
-                      resp = new DataValue(new Variant(ulong((BigInteger) response.getObject(fieldName))), StatusCode.GOOD);
-                  } else {
-                      resp = new DataValue(new Variant(response.getObject(fieldName)), StatusCode.GOOD);
-                  }
-              } else {
-                Object array = Array.newInstance(response.getObject(fieldName, 0).getClass(), numValues);
-                for (int i = 0; i < numValues; i++) {
-                    if (response.getObject(fieldName, i) instanceof BigInteger) {
-                        Array.set(array, i, ulong((BigInteger) response.getObject(fieldName, i)));
+            for (String fieldName : response.getFieldNames()) {
+                if (response.getResponseCode(fieldName) == PlcResponseCode.OK) {
+                    int numValues = response.getNumberOfValues(fieldName);
+                    if (numValues == 1) {
+                        if (response.getObject(fieldName) instanceof BigInteger) {
+                            resp = new DataValue(new Variant(ulong((BigInteger) response.getObject(fieldName))), StatusCode.GOOD);
+                        } else {
+                            resp = new DataValue(new Variant(response.getObject(fieldName)), StatusCode.GOOD);
+                        }
                     } else {
-                        Array.set(array, i, response.getObject(fieldName, i));
+                        Object array = null;
+                        if (response.getObject(fieldName, 0) instanceof BigInteger) {
+                            array = Array.newInstance(ULong.class, numValues);
+                        } else {
+                            array = Array.newInstance(response.getObject(fieldName, 0).getClass(), numValues);
+                        }
+                        for (int i = 0; i < numValues; i++) {
+                            if (response.getObject(fieldName, i) instanceof BigInteger) {
+                                Array.set(array, i, ulong((BigInteger) response.getObject(fieldName, i)));
+                            } else {
+                                Array.set(array, i, response.getObject(fieldName, i));
+                            }
+                        }
+                        resp = new DataValue(new Variant(array), StatusCode.GOOD);
                     }
                 }
-                resp = new DataValue(new Variant(array), StatusCode.GOOD);
-              }
-          }
-        }
+            }
 
-        try {
-          connection.close();
+            try {
+                connection.close();
+            } catch (Exception e) {
+                failedConnectionList.put(connectionString, System.currentTimeMillis());
+                logger.warn("Closing connection failed with error " + e);
+            }
+            return resp;
         } catch (Exception e) {
-          failedConnectionList.put(connectionString, System.currentTimeMillis());
-          logger.warn("Closing connection failed with error " + e);
+            logger.warn("General error reading value " + e.getStackTrace()[0].toString());
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (Exception ex) {
+                    //Do Nothing
+                }
+            }
+            return resp;
         }
-        return resp;
     }
 
     public void setValue(String tag, String value, String connectionString) {
diff --git a/plc4j/integrations/opcua-server/src/main/java/org/apache/plc4x/java/opcuaserver/backend/Plc4xNamespace.java b/plc4j/integrations/opcua-server/src/main/java/org/apache/plc4x/java/opcuaserver/backend/Plc4xNamespace.java
index 12e3313..53c3be9 100644
--- a/plc4j/integrations/opcua-server/src/main/java/org/apache/plc4x/java/opcuaserver/backend/Plc4xNamespace.java
+++ b/plc4j/integrations/opcua-server/src/main/java/org/apache/plc4x/java/opcuaserver/backend/Plc4xNamespace.java
@@ -64,13 +64,13 @@ public class Plc4xNamespace extends ManagedNamespaceWithLifecycle {
 
     public Plc4xNamespace(OpcUaServer server, Configuration c) {
         super(server, APPLICATIONID);
-
         this.config = c;
         subscriptionModel = new SubscriptionModel(server, this);
         dictionaryManager = new DataTypeDictionaryManager(getNodeContext(), APPLICATIONID);
         plc4xServer = new Plc4xCommunication();
         getLifecycleManager().addLifecycle(dictionaryManager);
         getLifecycleManager().addLifecycle(subscriptionModel);
+        getLifecycleManager().addLifecycle(plc4xServer);
         getLifecycleManager().addStartupTask(this::addNodes);
     }