You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2020/11/03 09:42:59 UTC

[plc4x] branch bug/kafka-source-cpu created (now 5354533)

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a change to branch bug/kafka-source-cpu
in repository https://gitbox.apache.org/repos/asf/plc4x.git.


      at 5354533  Fix Kafka Source CPU usage

This branch includes the following new commits:

     new 5354533  Fix Kafka Source CPU usage

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[plc4x] 01/01: Fix Kafka Source CPU usage

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch bug/kafka-source-cpu
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 5354533939f903ca8632a1d4bcdcca790946415b
Author: hutcheb <be...@gmail.com>
AuthorDate: Tue Nov 3 04:41:40 2020 -0500

    Fix Kafka Source CPU usage
---
 .../src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java    | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index f8f30cc..35d6d96 100644
--- a/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ b/plc4j/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -44,6 +44,7 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Source Connector Task polling the data source at a given rate.
@@ -219,7 +220,13 @@ public class Plc4xSourceTask extends SourceTask {
             buffer.drainTo(result, numElements);
             return result;
         } else {
-            return Collections.emptyList();
+            try {
+                List<SourceRecord> result = new ArrayList<>(1);
+                result.add(buffer.poll(5000, TimeUnit.MILLISECONDS));
+                return result;
+            } catch (InterruptedException e) {
+                return null;
+            }
         }
     }