You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/09/07 20:13:22 UTC

[GitHub] [camel-kafka-connector] valdar commented on a change in pull request #428: Proposed fix for issue #414

valdar commented on a change in pull request #428:
URL: https://github.com/apache/camel-kafka-connector/pull/428#discussion_r484557883



##########
File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -119,53 +119,63 @@ public void start(Map<String, String> props) {
         }
     }
 
+    private long remaining(long startPollEpochMilli, long maxPollDuration)  {
+        return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
+    }
+
+
     @Override
     public synchronized List<SourceRecord> poll() {
-        long startPollEpochMilli = Instant.now().toEpochMilli();
+        final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+        long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
-        List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
-            Exchange exchange = consumer.receiveNoWait();
+        List<SourceRecord> records = null;
+        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+            Exchange exchange = consumer.receive(remaining);
+            if (exchange == null) {

Review comment:
       what is the point of this check? if the exchange is null, since you receive waiting for the whole remaining time it means you have finished the time so there is no point in updating it earlier and continue. 
   
   I would rather just check if exchange is not `null` and in that case do all the work except updating remaining time, i.e. something like: 
   ```java
   while(...) {
       Exchange exchange = consumer.receive(remaining);
       if(exchange != null) {
       //whole code here
       }
       remaining = remaining(startPollEpochMilli, maxPollDuration);
   }
   ...
   ```

##########
File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -119,53 +119,63 @@ public void start(Map<String, String> props) {
         }
     }
 
+    private long remaining(long startPollEpochMilli, long maxPollDuration)  {
+        return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
+    }
+
+
     @Override
     public synchronized List<SourceRecord> poll() {
-        long startPollEpochMilli = Instant.now().toEpochMilli();
+        final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+        long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
-        List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
-            Exchange exchange = consumer.receiveNoWait();
+        List<SourceRecord> records = null;
+        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+            Exchange exchange = consumer.receive(remaining);
+            if (exchange == null) {

Review comment:
       I might have completely overlooked something here not understanding your motivations, feel free to disagree and point me wrong. After all I was the author of the buggy/not performant previous version of this section of the codebase :P

##########
File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -119,53 +119,63 @@ public void start(Map<String, String> props) {
         }
     }
 
+    private long remaining(long startPollEpochMilli, long maxPollDuration)  {
+        return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
+    }
+
+
     @Override
     public synchronized List<SourceRecord> poll() {
-        long startPollEpochMilli = Instant.now().toEpochMilli();
+        final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+        long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
-        List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
-            Exchange exchange = consumer.receiveNoWait();
+        List<SourceRecord> records = null;
+        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+            Exchange exchange = consumer.receive(remaining);
+            if (exchange == null) {
+                remaining = remaining(startPollEpochMilli, maxPollDuration);
+                continue;
+            }
+
+            if (records == null) {

Review comment:
       I would rather initialize `records` with and empty list check at the end if it is empty to return `null` for readability, than  getting (little?) gain in performance...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org