You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/09/08 10:30:22 UTC

[camel-kafka-connector] 02/02: Adjust the CamelSourceTask code to reduce CPU usage and heap allocations

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 0f26f989daeae86eda0073a3e33864fdffc81d3d
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Fri Sep 4 20:34:54 2020 +0200

    Adjust the CamelSourceTask code to reduce CPU usage and heap allocations
    
    This modifies the code so that it blocks while waiting for the messages
    to arrive while also respecting the maxPollDuration.
---
 .../camel/kafkaconnector/CamelSourceTask.java      | 80 ++++++++++++----------
 1 file changed, 43 insertions(+), 37 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index d4e0810..e825e49 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -119,53 +119,59 @@ public class CamelSourceTask extends SourceTask {
         }
     }
 
+    private long remaining(long startPollEpochMilli, long maxPollDuration)  {
+        return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
+    }
+
+
     @Override
     public synchronized List<SourceRecord> poll() {
-        long startPollEpochMilli = Instant.now().toEpochMilli();
+        final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+        long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
         List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
-            Exchange exchange = consumer.receiveNoWait();
-
-            if (exchange != null) {
-                LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
-
-                // TODO: see if there is a better way to use sourcePartition
-                // an sourceOffset
-                Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
-                Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
-
-                final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
-                final Object messageBodyValue = exchange.getMessage().getBody();
-
-                final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
-                final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
-
-                for (String singleTopic : topics) {
-                    SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue);
-                    if (exchange.getMessage().hasHeaders()) {
-                        setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
-                    }
-                    if (exchange.hasProperties()) {
-                        setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
-                    }
-
-                    TaskHelper.logRecordContent(LOG, record, config);
-                    records.add(record);
-                }
-                collectedRecords++;
-            } else {
+        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+            Exchange exchange = consumer.receive(remaining);
+            if (exchange == null) {
+                // Nothing received, abort and return what we received so far
                 break;
             }
-        }
 
-        if (records.isEmpty()) {
-            return Collections.EMPTY_LIST;
-        } else {
-            return records;
+            LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(),
+                    exchange.getMessage().getMessageId(), exchange.getFromEndpoint());
+
+            // TODO: see if there is a better way to use sourcePartition
+            // an sourceOffset
+            Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
+            Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId());
+
+            final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
+            final Object messageBodyValue = exchange.getMessage().getBody();
+
+            final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
+            final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+
+            for (String singleTopic : topics) {
+                SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema,
+                        messageHeaderKey, messageBodySchema, messageBodyValue);
+
+                if (exchange.getMessage().hasHeaders()) {
+                    setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
+                }
+                if (exchange.hasProperties()) {
+                    setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
+                }
+
+                TaskHelper.logRecordContent(LOG, record, config);
+                records.add(record);
+            }
+            collectedRecords++;
+            remaining = remaining(startPollEpochMilli, maxPollDuration);
         }
 
+        return records.isEmpty() ? null : records;
     }
 
     @Override