You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/09/08 05:00:04 UTC

[GitHub] [camel-kafka-connector] orpiske commented on a change in pull request #428: Proposed fix for issue #414

orpiske commented on a change in pull request #428:
URL: https://github.com/apache/camel-kafka-connector/pull/428#discussion_r484650459



##########
File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -119,53 +119,63 @@ public void start(Map<String, String> props) {
         }
     }
 
+    private long remaining(long startPollEpochMilli, long maxPollDuration)  {
+        return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
+    }
+
+
     @Override
     public synchronized List<SourceRecord> poll() {
-        long startPollEpochMilli = Instant.now().toEpochMilli();
+        final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+        long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
-        List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
-            Exchange exchange = consumer.receiveNoWait();
+        List<SourceRecord> records = null;
+        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+            Exchange exchange = consumer.receive(remaining);
+            if (exchange == null) {

Review comment:
       > what is the point of this check? if the exchange is null, since you receive waiting for the whole remaining time it means you have finished the time so there is no point in updating it earlier and continue.
   
   Great catch, man! I think you are right!
   
   > 
   > I would rather just check if exchange is not `null` and in that case do all the work except updating remaining time, i.e. something like:
   > 
   > ```java
   > while(...) {
   >     Exchange exchange = consumer.receive(remaining);
   >     if(exchange != null) {
   >     //whole code here
   >     }
   >     remaining = remaining(startPollEpochMilli, maxPollDuration);
   > }
   > ...
   > ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org